diff --git a/src/query/catalog/src/plan/pushdown.rs b/src/query/catalog/src/plan/pushdown.rs index 3418ef137491..75d1bb5bea74 100644 --- a/src/query/catalog/src/plan/pushdown.rs +++ b/src/query/catalog/src/plan/pushdown.rs @@ -14,7 +14,9 @@ use std::fmt::Debug; +use common_expression::types::DataType; use common_expression::RemoteExpr; +use common_expression::TableField; use common_expression::TableSchema; use crate::plan::Projection; @@ -48,7 +50,52 @@ pub struct PushDownInfo { pub order_by: Vec<(RemoteExpr, bool, bool)>, } +/// TopK is a wrapper for topk push down items. +/// We only take the first column in order_by as the topk column. +#[derive(Debug, Clone)] +pub struct TopK { + pub limit: usize, + pub order_by: TableField, + pub asc: bool, + pub column_id: u32, +} + impl PushDownInfo { + pub fn top_k(&self, schema: &TableSchema, support: fn(&DataType) -> bool) -> Option { + if !self.order_by.is_empty() && self.limit.is_some() { + let order = &self.order_by[0]; + let limit = self.limit.unwrap(); + + const MAX_TOPK_LIMIT: usize = 1000; + if limit > MAX_TOPK_LIMIT { + return None; + } + + if let RemoteExpr::::ColumnRef { id, .. } = &order.0 { + let field = schema.field_with_name(id).unwrap(); + let data_type: DataType = field.data_type().into(); + if !support(&data_type) { + return None; + } + + let leaf_fields = schema.leaf_fields(); + let column_id = leaf_fields.iter().position(|p| p == field).unwrap(); + + let top_k = TopK { + limit: self.limit.unwrap(), + order_by: field.clone(), + asc: order.1, + column_id: column_id as u32, + }; + Some(top_k) + } else { + None + } + } else { + None + } + } + pub fn prewhere_of_push_downs(push_downs: &Option) -> Option { if let Some(PushDownInfo { prewhere, .. }) = push_downs { prewhere.clone() diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index bdc208ef7d86..17f8f0387b3d 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -20,124 +20,39 @@ use common_arrow::arrow::buffer::Buffer; use common_exception::ErrorCode; use common_exception::Result; +use crate::filter_helper::FilterHelpers; use crate::types::array::ArrayColumnBuilder; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; -use crate::types::number::NumberScalar; use crate::types::string::StringColumnBuilder; use crate::types::AnyType; -use crate::types::ArgType; use crate::types::ArrayType; use crate::types::BooleanType; use crate::types::StringType; use crate::types::ValueType; use crate::types::VariantType; -use crate::with_number_mapped_type; use crate::with_number_type; use crate::BlockEntry; use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; -use crate::Scalar; use crate::TypeDeserializer; use crate::Value; impl DataBlock { - // check if the predicate has any valid row - pub fn filter_exists(predicate: &Value) -> Result { - let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { - ErrorCode::BadDataValueType(format!( - "Filter predict column does not support type '{:?}'", - predicate - )) - })?; - match predicate { - Value::Scalar(s) => Ok(s), - Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()), - } - } - pub fn filter(self, predicate: &Value) -> Result { if self.num_rows() == 0 { return Ok(self); } - let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { + let predicate = FilterHelpers::cast_to_nonull_boolean(predicate).ok_or_else(|| { ErrorCode::BadDataValueType(format!( "Filter predict column does not support type '{:?}'", predicate )) })?; - match predicate { - Value::Scalar(s) => { - if s { - Ok(self) - } else { - Ok(self.slice(0..0)) - } - } - Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap), - } - } - - // Must be numeric, boolean, or string value type - pub fn cast_to_nonull_boolean(predicate: &Value) -> Option> { - match predicate { - Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar), - Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column), - } - } - - fn cast_scalar_to_boolean(s: &Scalar) -> Option { - match s { - Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { - NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()), - }), - Scalar::Boolean(value) => Some(*value), - Scalar::String(value) => Some(!value.is_empty()), - Scalar::Timestamp(value) => Some(*value != 0), - Scalar::Date(value) => Some(*value != 0), - Scalar::Null => Some(false), - _ => None, - } - } - - fn cast_column_to_boolean(c: &Column) -> Option { - match c { - Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { - NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| v != &SRC_TYPE::default()), - &[], - )), - }), - Column::Boolean(value) => Some(value.clone()), - Column::String(value) => Some(BooleanType::column_from_iter( - value.iter().map(|s| !s.is_empty()), - &[], - )), - Column::Timestamp(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| *v != 0), - &[], - )), - Column::Date(value) => Some(BooleanType::column_from_iter( - value.iter().map(|v| *v != 0), - &[], - )), - Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()), - Column::Nullable(c) => { - let inner = Self::cast_column_to_boolean(&c.column)?; - Some((&inner) & (&c.validity)) - } - _ => None, - } - } - - pub fn try_as_const_bool(value: &Value) -> Result> { - match value { - Value::Scalar(v) => Ok(Some(*v)), - _ => Ok(None), - } + self.filter_boolean_value(predicate) } pub fn filter_with_bitmap(block: DataBlock, bitmap: &Bitmap) -> Result { @@ -169,6 +84,19 @@ impl DataBlock { } } } + + pub fn filter_boolean_value(self, filter: Value) -> Result { + match filter { + Value::Scalar(s) => { + if s { + Ok(self) + } else { + Ok(self.slice(0..0)) + } + } + Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap), + } + } } impl Column { diff --git a/src/query/expression/src/kernels/mod.rs b/src/query/expression/src/kernels/mod.rs index 11c7de9e41aa..8da5e5a65512 100644 --- a/src/query/expression/src/kernels/mod.rs +++ b/src/query/expression/src/kernels/mod.rs @@ -20,8 +20,10 @@ mod scatter; mod sort; mod take; mod take_chunks; +mod topk; pub use group_by::*; pub use group_by_hash::*; pub use sort::*; pub use take_chunks::*; +pub use topk::*; diff --git a/src/query/expression/src/kernels/topk.rs b/src/query/expression/src/kernels/topk.rs new file mode 100644 index 000000000000..74f32160e1e8 --- /dev/null +++ b/src/query/expression/src/kernels/topk.rs @@ -0,0 +1,227 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::Ordering; +use std::cmp::Ordering::Less; +use std::mem; +use std::ptr; + +use common_arrow::arrow::bitmap::MutableBitmap; + +use crate::types::*; +use crate::with_number_mapped_type; +use crate::Column; +use crate::Scalar; + +pub struct TopKSorter { + data: Vec, + limit: usize, + asc: bool, +} + +impl TopKSorter { + pub fn new(limit: usize, asc: bool) -> Self { + Self { + data: Vec::with_capacity(limit), + limit, + asc, + } + } + + // Push the column into this sorted and update the bitmap + // The bitmap could be used in filter + pub fn push_column(&mut self, col: &Column, bitmap: &mut MutableBitmap) { + with_number_mapped_type!(|NUM_TYPE| match col.data_type() { + DataType::Number(NumberDataType::NUM_TYPE) => + self.push_column_internal::>(col, bitmap), + DataType::String => self.push_column_internal::(col, bitmap), + DataType::Timestamp => self.push_column_internal::(col, bitmap), + DataType::Date => self.push_column_internal::(col, bitmap), + _ => {} + }); + } + + fn push_column_internal(&mut self, col: &Column, bitmap: &mut MutableBitmap) + where for<'a> T::ScalarRef<'a>: Ord { + let col = T::try_downcast_column(col).unwrap(); + for (i, value) in T::iter_column(&col).enumerate() { + if !bitmap.get(i) { + continue; + } + + if self.data.len() < self.limit { + self.data.push(T::upcast_scalar(T::to_owned_scalar(value))); + if self.data.len() == self.limit { + self.make_heap(); + } + } else if !self.push_value(value) { + bitmap.set(i, false); + } + } + } + + #[inline] + fn push_value(&mut self, value: T::ScalarRef<'_>) -> bool + where for<'a> T::ScalarRef<'a>: Ord { + let order = self.ordering(); + let data = self.data[0].clone(); + let data = data.as_ref(); + let data = T::try_downcast_scalar(&data).unwrap(); + let value = T::upcast_gat(value); + + if Ord::cmp(&data, &value) != order { + self.data[0] = T::upcast_scalar(T::to_owned_scalar(value)); + self.adjust(); + true + } else { + false + } + } + + #[inline] + pub fn never_match(&self, (min, max): &(Scalar, Scalar)) -> bool { + if self.data.len() != self.limit { + return false; + } + (self.asc && &self.data[0] < min) || (!self.asc && &self.data[0] > max) + } + + #[inline] + pub fn never_match_any(&self, col: &Column) -> bool { + if self.data.len() != self.limit { + return false; + } + with_number_mapped_type!(|NUM_TYPE| match col.data_type() { + DataType::Number(NumberDataType::NUM_TYPE) => + self.never_match_any_internal::>(col), + DataType::String => self.never_match_any_internal::(col), + DataType::Timestamp => self.never_match_any_internal::(col), + DataType::Date => self.never_match_any_internal::(col), + _ => false, + }) + } + + fn never_match_any_internal(&self, col: &Column) -> bool + where for<'a> T::ScalarRef<'a>: Ord { + let col = T::try_downcast_column(col).unwrap(); + let data = self.data[0].as_ref(); + + for val in T::iter_column(&col) { + let data = T::try_downcast_scalar(&data).unwrap(); + if (self.asc && data >= val) || (!self.asc && data <= val) { + return false; + } + } + true + } + + fn make_heap(&mut self) { + let ordering = self.ordering(); + let data = self.data.as_mut_slice(); + make_heap(data, &mut |a, b| a.cmp(b) == ordering); + } + + fn adjust(&mut self) { + let ordering = self.ordering(); + let data = self.data.as_mut_slice(); + adjust_heap(data, 0, data.len(), &mut |a, b| a.cmp(b) == ordering); + } + + fn ordering(&self) -> Ordering { + if self.asc { Less } else { Less.reverse() } + } +} + +#[inline] +fn make_heap(v: &mut [T], is_less: &mut F) +where F: FnMut(&T, &T) -> bool { + let len = v.len(); + let mut parent = (len - 2) / 2; + + loop { + adjust_heap(v, parent, len, is_less); + if parent == 0 { + return; + } + parent -= 1; + } +} + +/// adjust_heap is a shift up adjust op for the heap +#[inline] +fn adjust_heap(v: &mut [T], hole_index: usize, len: usize, is_less: &mut F) +where F: FnMut(&T, &T) -> bool { + let mut left_child = hole_index * 2 + 1; + + // SAFETY: we ensure hole_index point to a properly initialized value of type T + let mut tmp = unsafe { mem::ManuallyDrop::new(ptr::read(&v[hole_index])) }; + let mut hole = InsertionHole { + src: &mut *tmp, + dest: &mut v[hole_index], + }; + // Panic safety: + // + // If `is_less` panics at any point during the process, `hole` will get dropped and fill the + // hole in `v` with the unconsumed range in `buf`, thus ensuring that `v` still holds every + // object it initially held exactly once. + + // SAFETY: + // we ensure src/dest point to a properly initialized value of type T + // src is valid for reads of `count * size_of::()` bytes. + // dest is valid for reads of `count * size_of::()` bytes. + // Both `src` and `dst` are properly aligned. + + unsafe { + while left_child < len { + // SAFETY: + // we ensure left_child and left_child + 1 are between [0, len) + if left_child + 1 < len + && is_less(v.get_unchecked(left_child), v.get_unchecked(left_child + 1)) + { + left_child += 1; + } + + // SAFETY: + // left_child and hole.dest point to a properly initialized value of type T + if is_less(&*tmp, v.get_unchecked(left_child)) { + ptr::copy_nonoverlapping(&v[left_child], hole.dest, 1); + hole.dest = &mut v[left_child]; + } else { + break; + } + + left_child = left_child * 2 + 1; + } + } + + // These codes is from std::sort_by + // When dropped, copies from `src` into `dest`. + struct InsertionHole { + src: *mut T, + dest: *mut T, + } + + impl Drop for InsertionHole { + fn drop(&mut self) { + // SAFETY: + // we ensure src/dest point to a properly initialized value of type T + // src is valid for reads of `count * size_of::()` bytes. + // dest is valid for reads of `count * size_of::()` bytes. + // Both `src` and `dst` are properly aligned. + unsafe { + ptr::copy_nonoverlapping(self.src, self.dest, 1); + } + } + } +} diff --git a/src/query/expression/src/utils/filter_helper.rs b/src/query/expression/src/utils/filter_helper.rs new file mode 100644 index 000000000000..5c4382a45d7a --- /dev/null +++ b/src/query/expression/src/utils/filter_helper.rs @@ -0,0 +1,123 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::bitmap::MutableBitmap; +use common_exception::ErrorCode; +use common_exception::Result; + +use crate::arrow::bitmap_into_mut; +use crate::types::number::NumberScalar; +use crate::types::AnyType; +use crate::types::ArgType; +use crate::types::BooleanType; +use crate::types::NumberColumn; +use crate::with_number_mapped_type; +use crate::Column; +use crate::Scalar; +use crate::Value; + +pub struct FilterHelpers; + +impl FilterHelpers { + // check if the predicate has any valid row + pub fn filter_exists(predicate: &Value) -> Result { + let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| { + ErrorCode::BadDataValueType(format!( + "Filter predict column does not support type '{:?}'", + predicate + )) + })?; + match predicate { + Value::Scalar(s) => Ok(s), + Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()), + } + } + + // Must be numeric, boolean, or string value type + #[inline] + pub fn cast_to_nonull_boolean(predicate: &Value) -> Option> { + match predicate { + Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar), + Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column), + } + } + + #[inline] + pub fn is_all_unset(predicate: &Value) -> bool { + match &predicate { + Value::Scalar(v) => !v, + Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), + } + } + + fn cast_scalar_to_boolean(s: &Scalar) -> Option { + match s { + Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { + NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()), + }), + Scalar::Boolean(value) => Some(*value), + Scalar::String(value) => Some(!value.is_empty()), + Scalar::Timestamp(value) => Some(*value != 0), + Scalar::Date(value) => Some(*value != 0), + Scalar::Null => Some(false), + _ => None, + } + } + + fn cast_column_to_boolean(c: &Column) -> Option { + match c { + Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num { + NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| v != &SRC_TYPE::default()), + &[], + )), + }), + Column::Boolean(value) => Some(value.clone()), + Column::String(value) => Some(BooleanType::column_from_iter( + value.iter().map(|s| !s.is_empty()), + &[], + )), + Column::Timestamp(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| *v != 0), + &[], + )), + Column::Date(value) => Some(BooleanType::column_from_iter( + value.iter().map(|v| *v != 0), + &[], + )), + Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()), + Column::Nullable(c) => { + let inner = Self::cast_column_to_boolean(&c.column)?; + Some((&inner) & (&c.validity)) + } + _ => None, + } + } + + pub fn try_as_const_bool(value: &Value) -> Result> { + match value { + Value::Scalar(v) => Ok(Some(*v)), + _ => Ok(None), + } + } + + pub fn filter_to_bitmap(predicate: Value, rows: usize) -> MutableBitmap { + match predicate { + Value::Scalar(true) => MutableBitmap::from_len_set(rows), + Value::Scalar(false) => MutableBitmap::from_len_zeroed(rows), + Value::Column(bitmap) => bitmap_into_mut(bitmap), + } + } +} diff --git a/src/query/expression/src/utils/mod.rs b/src/query/expression/src/utils/mod.rs index 9a96cf3ab4cc..4b1ffd2c4b6c 100644 --- a/src/query/expression/src/utils/mod.rs +++ b/src/query/expression/src/utils/mod.rs @@ -19,6 +19,7 @@ pub mod block_thresholds; mod column_from; pub mod date_helper; pub mod display; +pub mod filter_helper; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::chunk::Chunk as ArrowChunk; diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index c0b2ba73ec8f..7b19b6673fe0 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -87,7 +87,7 @@ pub enum Scalar { Variant(Vec), } -#[derive(Clone, Default, EnumAsInner)] +#[derive(Clone, Default, Eq, EnumAsInner)] pub enum ScalarRef<'a> { #[default] Null, @@ -415,6 +415,12 @@ impl PartialOrd for ScalarRef<'_> { } } +impl Ord for ScalarRef<'_> { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap_or(Ordering::Equal) + } +} + impl PartialEq for ScalarRef<'_> { fn eq(&self, other: &Self) -> bool { self.partial_cmp(other) == Some(Ordering::Equal) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs index 115f457811e6..ce0ba4f1c7b2 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_final_parallel.rs @@ -143,7 +143,7 @@ where Method: HashMethod + PolymorphicKeysHelper + Send + 'static params: Arc, hash_table: Method::HashTable, - reach_limit: bool, + pub(crate) reach_limit: bool, // used for deserialization only, so we can reuse it during the loop temp_place: Option, } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index b0f838e87778..b3fdd9f4a196 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -19,6 +19,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_expression::arrow::constant_bitmap; use common_expression::arrow::or_validities; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::nullable::NullableColumn; use common_expression::types::AnyType; use common_expression::types::DataType; @@ -195,7 +196,7 @@ impl JoinHashTable { let filter_vector: Value = evaluator .run(filter) .map_err(|(_, e)| ErrorCode::Internal(format!("Invalid expression: {}", e)))?; - let predict_boolean_nonull = DataBlock::cast_to_nonull_boolean(&filter_vector) + let predict_boolean_nonull = FilterHelpers::cast_to_nonull_boolean(&filter_vector) .ok_or_else(|| ErrorCode::Internal("Cannot get the boolean column"))?; match predict_boolean_nonull { diff --git a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs index e40b323593d7..a44edfd859ce 100644 --- a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs +++ b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs @@ -109,7 +109,7 @@ fn test_to_partitions() -> Result<()> { let column_nodes = ColumnNodes { column_nodes }; // CASE I: no projection - let (s, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, None); + let (s, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, None, None); assert_eq!(parts.len(), num_of_block as usize); let expected_block_size: u64 = cols_metas .values() @@ -141,7 +141,7 @@ fn test_to_partitions() -> Result<()> { prewhere: None, }); - let (stats, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, push_down); + let (stats, parts) = FuseTable::to_partitions(&blocks_metas, &column_nodes, None, push_down); assert_eq!(parts.len(), num_of_block as usize); assert_eq!(expected_block_size * num_of_block, stats.read_bytes as u64); diff --git a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs index b2d960421db7..f6aec6ee6a71 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/heuristic.rs @@ -44,12 +44,12 @@ pub static DEFAULT_REWRITE_RULES: Lazy> = Lazy::new(|| { RuleID::PushDownLimitAggregate, RuleID::PushDownLimitOuterJoin, RuleID::PushDownLimitScan, - RuleID::PushDownSortScan, RuleID::PushDownFilterEvalScalar, RuleID::PushDownFilterJoin, RuleID::FoldCountAggregate, RuleID::SplitAggregate, RuleID::PushDownFilterScan, + RuleID::PushDownSortScan, ] }); @@ -99,6 +99,12 @@ impl HeuristicOptimizer { let optimized = self.optimize_expression(&pre_optimized)?; let post_optimized = self.post_optimize(optimized)?; + // do it again, some rules may be missed after the post_optimized + // for example: push down sort + limit (topn) to scan + // TODO: if we push down the filter to scan, we need to remove the filter plan + let optimized = self.optimize_expression(&post_optimized)?; + let post_optimized = self.post_optimize(optimized)?; + Ok(post_optimized) } @@ -126,7 +132,6 @@ impl HeuristicOptimizer { // Recursive optimize the result let result = &state.results()[0]; let optimized_result = self.optimize_expression(result)?; - return Ok(optimized_result); } } diff --git a/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs b/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs index 8751a491a9f0..3eab1d78eeb0 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/prewhere_optimization.rs @@ -92,7 +92,6 @@ impl PrewhereOptimizer { } pub fn prewhere_optimize(&self, s_expr: SExpr) -> Result { - let rel_op = s_expr.plan(); if s_expr.match_pattern(&self.pattern) { let filter: Filter = s_expr.plan().clone().try_into()?; let mut get: Scan = s_expr.child(0)?.plan().clone().try_into()?; @@ -132,7 +131,7 @@ impl PrewhereOptimizer { .iter() .map(|expr| self.prewhere_optimize(expr.clone())) .collect::>>()?; - Ok(SExpr::create(rel_op.clone(), children, None, None)) + Ok(s_expr.replace_children(children)) } } } diff --git a/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs b/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs index 1792425133c6..9a96dffdb17c 100644 --- a/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs +++ b/src/query/sql/src/planner/optimizer/heuristic/prune_unused_columns.rs @@ -38,7 +38,9 @@ impl UnusedColumnPruner { } pub fn remove_unused_columns(&self, expr: &SExpr, require_columns: ColumnSet) -> Result { - Self::keep_required_columns(expr, require_columns) + let mut s_expr = Self::keep_required_columns(expr, require_columns)?; + s_expr.applied_rules = expr.applied_rules.clone(); + Ok(s_expr) } /// Keep columns referenced by parent plan node. @@ -161,17 +163,9 @@ impl UnusedColumnPruner { .is_none() && p.group_items.is_empty() { - required.insert( - *rel_prop - .output_columns - .iter() - .sorted() - .take(1) - .next() - .ok_or_else(|| { - ErrorCode::Internal("Invalid children without output column") - })?, - ); + if let Some(index) = rel_prop.output_columns.iter().sorted().take(1).next() { + required.insert(*index); + } } p.group_items.iter().for_each(|i| { diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs index 76f0c2a3b8d5..c726c27765bc 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_aggregate.rs @@ -78,7 +78,7 @@ impl Rule for RulePushDownLimitAggregate { agg_limit.limit = Some(agg_limit.limit.map_or(count, |c| cmp::max(c, count))); let agg = SExpr::create_unary(RelOperator::Aggregate(agg_limit), agg.child(0)?.clone()); - let mut result = SExpr::create_unary(limit.into(), agg); + let mut result = s_expr.replace_children(vec![agg]); result.set_applied_rule(&self.id); state.add_result(result); } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs index 991ade0ed07d..9af826b03b97 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_expression.rs @@ -72,7 +72,9 @@ impl Rule for RulePushDownLimitExpression { let limit_expr = SExpr::create_unary(RelOperator::Limit(limit), eval_plan.child(0)?.clone()); - let result = SExpr::create_unary(RelOperator::EvalScalar(eval_scalar), limit_expr); + let mut result = SExpr::create_unary(RelOperator::EvalScalar(eval_scalar), limit_expr); + + result.set_applied_rule(&self.id); state.add_result(result); Ok(()) } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs index 46d8d8fc49d1..a888522d155c 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_join.rs @@ -86,13 +86,15 @@ impl Rule for RulePushDownLimitOuterJoin { let join: Join = child.plan().clone().try_into()?; match join.join_type { JoinType::Left | JoinType::Full => { - state.add_result(s_expr.replace_children(vec![child.replace_children(vec![ + let mut result = s_expr.replace_children(vec![child.replace_children(vec![ SExpr::create_unary( RelOperator::Limit(limit.clone()), child.child(0)?.clone(), ), SExpr::create_unary(RelOperator::Limit(limit), child.child(1)?.clone()), - ])])) + ])]); + result.set_applied_rule(&self.id); + state.add_result(result) } _ => {} } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs index 96c47698ae61..94d72766e7ca 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_scan.rs @@ -73,7 +73,10 @@ impl Rule for RulePushDownLimitScan { count += limit.offset; get.limit = Some(get.limit.map_or(count, |c| cmp::max(c, count))); let get = SExpr::create_leaf(RelOperator::Scan(get)); - state.add_result(s_expr.replace_children(vec![get])); + + let mut result = s_expr.replace_children(vec![get]); + result.set_applied_rule(&self.id); + state.add_result(result); } Ok(()) } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs index 81eb90ffcb99..326211524f97 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_sort.rs @@ -74,7 +74,7 @@ impl Rule for RulePushDownLimitSort { sort_limit.limit = Some(sort_limit.limit.map_or(count, |c| cmp::max(c, count))); let sort = SExpr::create_unary(RelOperator::Sort(sort_limit), sort.child(0)?.clone()); - let mut result = SExpr::create_unary(limit.into(), sort); + let mut result = s_expr.replace_children(vec![sort]); result.set_applied_rule(&self.id); state.add_result(result); } diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs index 093c9f7708ca..244333a9414e 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_limit_union.rs @@ -98,8 +98,9 @@ impl Rule for RulePushDownLimitUnion { union_right_child = SExpr::create_unary(new_limit.into(), union_right_child); let mut result = SExpr::create_binary(union.into(), union_left_child, union_right_child); + // Add original limit to top - result = SExpr::create_unary(limit.into(), result); + result = s_expr.replace_children(vec![result]); result.set_applied_rule(&self.id); state.add_result(result); diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs index e4783c086202..c7c640222764 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_push_down_sort_scan.rs @@ -76,7 +76,10 @@ impl Rule for RulePushDownSortScan { get.limit = Some(get.limit.map_or(limit, |c| cmp::max(c, limit))); } let get = SExpr::create_leaf(RelOperator::Scan(get)); - state.add_result(s_expr.replace_children(vec![get])); + + let mut result = s_expr.replace_children(vec![get]); + result.set_applied_rule(&self.id); + state.add_result(result); Ok(()) } diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index e29411fca363..2e509fec094c 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -24,6 +24,7 @@ use common_catalog::plan::PartInfo; use common_catalog::plan::PartInfoPtr; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::Scalar; use storages_common_table_meta::meta::ColumnMeta; use storages_common_table_meta::meta::Compression; @@ -37,6 +38,7 @@ pub struct FusePartInfo { pub columns_meta: HashMap, pub compression: Compression, + pub sort_min_max: Option<(Scalar, Scalar)>, /// page range in the file pub range: Option>, } @@ -68,6 +70,7 @@ impl FusePartInfo { rows_count: u64, columns_meta: HashMap, compression: Compression, + sort_min_max: Option<(Scalar, Scalar)>, range: Option>, ) -> Arc> { Arc::new(Box::new(FusePartInfo { @@ -76,6 +79,7 @@ impl FusePartInfo { columns_meta, nums_rows: rows_count as usize, compression, + sort_min_max, range, })) } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs index 9ed904ef64e8..b1951fca5050 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_native.rs @@ -151,6 +151,7 @@ impl BlockReader { if let Some(range) = range { column_meta = column_meta.slice(range.start, range.end); } + let (offset, length) = ( column_meta.offset, column_meta.pages.iter().map(|p| p.length).sum::(), diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index a4cd508897d7..890609465eab 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -24,6 +24,7 @@ use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::DataType; use common_expression::BlockEntry; use common_expression::Column; @@ -170,7 +171,7 @@ impl FuseTable { let res = evaluator .run(&filter_expr) .map_err(|(_, e)| ErrorCode::Internal(format!("eval try eval const failed: {}.", e)))?; - let predicates = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { + let predicates = FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { ErrorCode::BadArguments("Result of filter expression cannot be converted to boolean.") })?; diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs index cbbb20f5c9c9..df84b7d06099 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs @@ -22,6 +22,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::AnyType; use common_expression::types::DataType; use common_expression::BlockEntry; @@ -182,11 +183,12 @@ impl Processor for MutationSource { let res = evaluator.run(filter).map_err(|(_, e)| { ErrorCode::Internal(format!("eval filter failed: {}.", e)) })?; - let predicates = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { - ErrorCode::BadArguments( - "Result of filter expression cannot be converted to boolean.", - ) - })?; + let predicates = + FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { + ErrorCode::BadArguments( + "Result of filter expression cannot be converted to boolean.", + ) + })?; let affect_rows = match &predicates { Value::Scalar(v) => { diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 5fc84ef2ec82..40b31224d180 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::VecDeque; use std::sync::Arc; use common_base::base::Progress; @@ -20,15 +21,19 @@ use common_base::base::ProgressValues; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::PushDownInfo; +use common_catalog::plan::TopK; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; +use common_expression::Column; use common_expression::ConstantFolder; use common_expression::DataBlock; use common_expression::DataSchema; use common_expression::Evaluator; use common_expression::Expr; use common_expression::FunctionContext; +use common_expression::TopKSorter; use common_expression::Value; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::InputPort; @@ -36,6 +41,8 @@ use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; +use storages_common_index::Index; +use storages_common_index::RangeIndex; use crate::fuse_part::FusePartInfo; use crate::io::BlockReader; @@ -51,8 +58,8 @@ pub struct NativeDeserializeDataTransform { input: Arc, output: Arc, output_data: Option, - parts: Vec, - chunks: Vec, + parts: VecDeque, + chunks: VecDeque, prewhere_columns: Vec, remain_columns: Vec, @@ -61,7 +68,11 @@ pub struct NativeDeserializeDataTransform { output_schema: DataSchema, prewhere_filter: Arc>, - prewhere_skipped: usize, + skipped_page: usize, + + read_columns: Vec, + top_k: Option<(TopK, TopKSorter, usize)>, + topn_finish: bool, } impl NativeDeserializeDataTransform { @@ -73,15 +84,24 @@ impl NativeDeserializeDataTransform { output: Arc, ) -> Result { let scan_progress = ctx.get_scan_progress(); + + let table_schema = plan.source_info.schema(); let src_schema: DataSchema = (block_reader.schema().as_ref()).into(); - let prewhere_columns: Vec = + let top_k = plan + .push_downs + .as_ref() + .map(|p| p.top_k(table_schema.as_ref(), RangeIndex::supported_type)) + .unwrap_or_default(); + + let mut prewhere_columns: Vec = match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { None => (0..src_schema.num_fields()).collect(), Some(v) => { let projected_arrow_schema = v .prewhere_columns .project_schema(plan.source_info.schema().as_ref()); + projected_arrow_schema .fields() .iter() @@ -90,6 +110,17 @@ impl NativeDeserializeDataTransform { } }; + let top_k = top_k.map(|top_k| { + let index = src_schema.index_of(top_k.order_by.name()).unwrap(); + let sorter = TopKSorter::new(top_k.limit, top_k.asc); + + if !prewhere_columns.contains(&index) { + prewhere_columns.push(index); + prewhere_columns.sort(); + } + (top_k, sorter, index) + }); + let remain_columns: Vec = (0..src_schema.num_fields()) .filter(|i| !prewhere_columns.contains(i)) .collect(); @@ -117,15 +148,18 @@ impl NativeDeserializeDataTransform { input, output, output_data: None, - parts: vec![], - chunks: vec![], + parts: VecDeque::new(), + chunks: VecDeque::new(), prewhere_columns, remain_columns, src_schema, output_schema, prewhere_filter, - prewhere_skipped: 0, + skipped_page: 0, + top_k, + topn_finish: false, + read_columns: vec![], }, ))) } @@ -148,6 +182,45 @@ impl NativeDeserializeDataTransform { }, ) } + + fn add_block(&mut self, data_block: DataBlock) -> Result<()> { + let rows = data_block.num_rows(); + if rows == 0 { + return Ok(()); + } + let progress_values = ProgressValues { + rows, + bytes: data_block.memory_size(), + }; + self.scan_progress.incr(&progress_values); + self.output_data = Some(data_block); + Ok(()) + } + + /// check topk should return finished or not + fn check_topn(&mut self) { + if let Some((_, sorter, _)) = &mut self.top_k { + if let Some(next_part) = self.parts.front() { + let next_part = next_part.as_any().downcast_ref::().unwrap(); + if next_part.sort_min_max.is_none() { + return; + } + if let Some(sort_min_max) = &next_part.sort_min_max { + self.topn_finish = sorter.never_match(sort_min_max); + } + } + } + } + + fn skip_chunks_page(read_columns: &[usize], chunks: &mut DataChunks) -> Result<()> { + for (index, chunk) in chunks.iter_mut().enumerate() { + if read_columns.contains(&index) { + continue; + } + chunk.1.skip_page()?; + } + Ok(()) + } } impl Processor for NativeDeserializeDataTransform { @@ -179,10 +252,15 @@ impl Processor for NativeDeserializeDataTransform { if !self.input.has_data() { self.input.set_need_data(); } - return Ok(Event::Sync); } + if self.topn_finish { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } + if self.input.has_data() { let mut data_block = self.input.pull_data().unwrap()?; if let Some(mut source_meta) = data_block.take_meta() { @@ -190,8 +268,15 @@ impl Processor for NativeDeserializeDataTransform { .as_mut_any() .downcast_mut::() { - self.parts = source_meta.part.clone(); - self.chunks = std::mem::take(&mut source_meta.chunks); + self.parts = VecDeque::from(std::mem::take(&mut source_meta.part)); + + self.check_topn(); + if self.topn_finish { + self.input.finish(); + self.output.finish(); + return Ok(Event::Finished); + } + self.chunks = VecDeque::from(std::mem::take(&mut source_meta.chunks)); return Ok(Event::Sync); } } @@ -200,7 +285,7 @@ impl Processor for NativeDeserializeDataTransform { } if self.input.is_finished() { - metrics_inc_pruning_prewhere_nums(self.prewhere_skipped as u64); + metrics_inc_pruning_prewhere_nums(self.skipped_page as u64); self.output.finish(); return Ok(Event::Finished); } @@ -210,30 +295,62 @@ impl Processor for NativeDeserializeDataTransform { } fn process(&mut self) -> Result<()> { - if let Some(chunks) = self.chunks.last_mut() { + if let Some(chunks) = self.chunks.front_mut() { // this means it's empty projection if chunks.is_empty() { - let _ = self.chunks.pop(); - let part = self.parts.pop().unwrap(); + let _ = self.chunks.pop_front(); + let part = self.parts.pop_front().unwrap(); + let part = FusePartInfo::from_part(&part)?; let data_block = DataBlock::new(vec![], part.nums_rows); - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - self.output_data = Some(data_block); + self.add_block(data_block)?; return Ok(()); } let mut arrays = Vec::with_capacity(chunks.len()); + self.read_columns.clear(); + + // Step 1: Check TOP_K, if prewhere_columns contains not only TOP_K, we can check if TOP_K column can satisfy the heap. + if self.prewhere_columns.len() > 1 { + if let Some((top_k, sorter, index)) = self.top_k.as_mut() { + let chunk = chunks.get_mut(*index).unwrap(); + if !chunk.1.has_next() { + // No data anymore + let _ = self.chunks.pop_front(); + let _ = self.parts.pop_front().unwrap(); + self.check_topn(); + // check finished + return Ok(()); + } + let array = chunk.1.next_array()?; + self.read_columns.push(*index); + + let data_type = top_k.order_by.data_type().into(); + let col = Column::from_arrow(array.as_ref(), &data_type); + + arrays.push((chunk.0, array)); + if sorter.never_match_any(&col) { + self.skipped_page += 1; + return Self::skip_chunks_page(&self.read_columns, chunks); + } + } + } + + // Step 2: Read Prewhere columns and get the filter for index in self.prewhere_columns.iter() { + if self.read_columns.contains(index) { + continue; + } let chunk = chunks.get_mut(*index).unwrap(); if !chunk.1.has_next() { // No data anymore - let _ = self.chunks.pop(); + let _ = self.chunks.pop_front(); + let _ = self.parts.pop_front().unwrap(); + + self.check_topn(); return Ok(()); } + self.read_columns.push(*index); arrays.push((chunk.0, chunk.1.next_array()?)); } @@ -245,20 +362,38 @@ impl Processor for NativeDeserializeDataTransform { let result = evaluator.run(filter).map_err(|(_, e)| { ErrorCode::Internal(format!("eval prewhere filter failed: {}.", e)) })?; - let filter = DataBlock::cast_to_nonull_boolean(&result).unwrap(); + let filter = FilterHelpers::cast_to_nonull_boolean(&result).unwrap(); + + // Step 3: Apply the filter, if it's all filtered, we can skip the remain columns. + if FilterHelpers::is_all_unset(&filter) { + self.skipped_page += 1; + return Self::skip_chunks_page(&self.read_columns, chunks); + } - let all_filtered = match &filter { - Value::Scalar(v) => !v, - Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), + // Step 4: Apply the filter to topk and update the bitmap, this will filter more results + let filter = if let Some((_, sorter, index)) = &mut self.top_k { + let index_prewhere = self + .prewhere_columns + .iter() + .position(|x| x == index) + .unwrap(); + let top_k_column = prewhere_block + .get_by_offset(index_prewhere) + .value + .as_column() + .unwrap(); + + let mut bitmap = + FilterHelpers::filter_to_bitmap(filter, prewhere_block.num_rows()); + sorter.push_column(top_k_column, &mut bitmap); + Value::Column(bitmap.into()) + } else { + filter }; - if all_filtered { - self.prewhere_skipped += 1; - for index in self.remain_columns.iter() { - let chunk = chunks.get_mut(*index).unwrap(); - chunk.1.skip_page()?; - } - return Ok(()); + if FilterHelpers::is_all_unset(&filter) { + self.skipped_page += 1; + return Self::skip_chunks_page(&self.read_columns, chunks); } for index in self.remain_columns.iter() { @@ -268,7 +403,7 @@ impl Processor for NativeDeserializeDataTransform { let block = self.block_reader.build_block(arrays)?; let block = block.resort(&self.src_schema, &self.output_schema)?; - block.filter(&result) + block.filter_boolean_value(filter) } None => { for index in self.remain_columns.iter() { @@ -279,12 +414,8 @@ impl Processor for NativeDeserializeDataTransform { } }?; - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - self.output_data = Some(data_block); + // Step 5: Add the block to topk + self.add_block(data_block)?; } Ok(()) diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 17929a935e2b..59c433bbbfca 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -24,6 +24,7 @@ use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::Projection; use common_catalog::plan::PruningStatistics; use common_catalog::plan::PushDownInfo; +use common_catalog::plan::TopK; use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::Result; @@ -31,6 +32,8 @@ use common_expression::TableSchemaRef; use common_meta_app::schema::TableInfo; use common_storage::ColumnNodes; use opendal::Operator; +use storages_common_index::Index; +use storages_common_index::RangeIndex; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::Location; use tracing::debug; @@ -154,7 +157,12 @@ impl FuseTable { let partitions_scanned = block_metas.len(); - let (mut statistics, parts) = Self::to_partitions(block_metas, &column_nodes, push_downs); + let top_k = push_downs + .as_ref() + .map(|p| p.top_k(self.schema().as_ref(), RangeIndex::supported_type)) + .unwrap_or_default(); + let (mut statistics, parts) = + Self::to_partitions(block_metas, &column_nodes, top_k, push_downs); // Update planner statistics. statistics.partitions_total = partitions_total; @@ -173,6 +181,7 @@ impl FuseTable { pub fn to_partitions( block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, + top_k: Option, push_down: Option, ) -> (PartStatistics, Partitions) { let limit = push_down @@ -181,16 +190,38 @@ impl FuseTable { .and_then(|p| p.limit) .unwrap_or(usize::MAX); - let (mut statistics, partitions) = match &push_down { - None => Self::all_columns_partitions(block_metas, limit), - Some(extras) => match &extras.projection { - None => Self::all_columns_partitions(block_metas, limit), - Some(projection) => { - Self::projection_partitions(block_metas, column_nodes, projection, limit) + let mut block_metas = block_metas.to_vec(); + if let Some(top_k) = &top_k { + block_metas.sort_by(|a, b| { + let a = a.1.col_stats.get(&top_k.column_id).unwrap(); + let b = b.1.col_stats.get(&top_k.column_id).unwrap(); + + if top_k.asc { + (a.min.as_ref(), a.max.as_ref()).cmp(&(b.min.as_ref(), b.max.as_ref())) + } else { + (b.max.as_ref(), b.min.as_ref()).cmp(&(a.max.as_ref(), a.min.as_ref())) } + }); + } + + let (mut statistics, mut partitions) = match &push_down { + None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), + Some(extras) => match &extras.projection { + None => Self::all_columns_partitions(&block_metas, top_k.clone(), limit), + Some(projection) => Self::projection_partitions( + &block_metas, + column_nodes, + projection, + top_k.clone(), + limit, + ), }, }; + if top_k.is_some() { + partitions.kind = PartitionsShuffleKind::Seq; + } + statistics.is_exact = statistics.is_exact && Self::is_exact(&push_down); (statistics, partitions) } @@ -204,6 +235,7 @@ impl FuseTable { pub fn all_columns_partitions( block_metas: &[(Option>, Arc)], + top_k: Option, limit: usize, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); @@ -214,12 +246,11 @@ impl FuseTable { } let mut remaining = limit; - for (range, block_meta) in block_metas.iter() { let rows = block_meta.row_count as usize; partitions .partitions - .push(Self::all_columns_part(range.clone(), block_meta)); + .push(Self::all_columns_part(range.clone(), &top_k, block_meta)); statistics.read_rows += rows; statistics.read_bytes += block_meta.block_size as usize; @@ -241,6 +272,7 @@ impl FuseTable { block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, projection: &Projection, + top_k: Option, limit: usize, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); @@ -257,6 +289,7 @@ impl FuseTable { block_meta, range.clone(), column_nodes, + top_k.clone(), projection, )); let rows = block_meta.row_count as usize; @@ -285,7 +318,11 @@ impl FuseTable { (statistics, partitions) } - pub fn all_columns_part(range: Option>, meta: &BlockMeta) -> PartInfoPtr { + pub fn all_columns_part( + range: Option>, + top_k: &Option, + meta: &BlockMeta, + ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(meta.col_metas.len()); for (idx, column_meta) in &meta.col_metas { @@ -295,12 +332,19 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; + + let sort_min_max = top_k.as_ref().map(|top_k| { + let stat = meta.col_stats.get(&top_k.column_id).unwrap(); + (stat.min.clone(), stat.max.clone()) + }); + FusePartInfo::create( location, format_version, rows_count, columns_meta, meta.compression(), + sort_min_max, range, ) } @@ -309,6 +353,7 @@ impl FuseTable { meta: &BlockMeta, range: Option>, column_nodes: &ColumnNodes, + top_k: Option, projection: &Projection, ) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(projection.len()); @@ -326,6 +371,12 @@ impl FuseTable { let rows_count = meta.row_count; let location = meta.location.0.clone(); let format_version = meta.location.1; + + let sort_min_max = top_k.map(|top_k| { + let stat = meta.col_stats.get(&top_k.column_id).unwrap(); + (stat.min.clone(), stat.max.clone()) + }); + // TODO // row_count should be a hint value of LIMIT, // not the count the rows in this partition @@ -335,6 +386,7 @@ impl FuseTable { rows_count, columns_meta, meta.compression(), + sort_min_max, range, ) } diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index ab03c5dad104..1ace919e486f 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -181,7 +181,7 @@ impl FusePruner { } // topn pruner: - // if there are ordering + limit clause, use topn pruner + // if there are ordering + limit clause and no filters, use topn pruner fn topn_pruning( &self, metas: Vec<(BlockMetaIndex, Arc)>, @@ -189,7 +189,7 @@ impl FusePruner { let push_down = self.push_down.clone(); if push_down .as_ref() - .filter(|p| !p.order_by.is_empty() && p.limit.is_some()) + .filter(|p| !p.order_by.is_empty() && p.limit.is_some() && p.filters.is_empty()) .is_some() { let schema = self.table_schema.clone(); diff --git a/src/query/storages/hive/hive/src/hive_table_source.rs b/src/query/storages/hive/hive/src/hive_table_source.rs index 36cb22758737..3849fb7ba67a 100644 --- a/src/query/storages/hive/hive/src/hive_table_source.rs +++ b/src/query/storages/hive/hive/src/hive_table_source.rs @@ -24,6 +24,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::AnyType; use common_expression::DataBlock; use common_expression::DataSchemaRef; @@ -150,7 +151,7 @@ impl HiveTableSource { })?; valids.push(res.clone()); // shortcut, if predicates is const boolean (or can be cast to boolean) - match DataBlock::filter_exists(&res)? { + match FilterHelpers::filter_exists(&res)? { true => { exists = true; } diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index da92bfa677d5..c79a4bc5d512 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -24,6 +24,7 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::filter_helper::FilterHelpers; use common_expression::types::BooleanType; use common_expression::DataBlock; use common_expression::DataSchemaRef; @@ -136,7 +137,7 @@ impl ParquetSource { let res = evaluator.run(filter).map_err(|(_, e)| { ErrorCode::Internal(format!("eval prewhere filter failed: {}.", e)) })?; - let filter = DataBlock::cast_to_nonull_boolean(&res).ok_or_else(|| { + let filter = FilterHelpers::cast_to_nonull_boolean(&res).ok_or_else(|| { ErrorCode::BadArguments( "Result of filter expression cannot be converted to boolean.", )