Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): add topn runtime filter in native storage format #9738

Merged
merged 10 commits into from
Jan 27, 2023
47 changes: 47 additions & 0 deletions src/query/catalog/src/plan/pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use std::fmt::Debug;

use common_expression::types::DataType;
use common_expression::RemoteExpr;
use common_expression::TableField;
use common_expression::TableSchema;

use crate::plan::Projection;
Expand Down Expand Up @@ -48,7 +50,52 @@ pub struct PushDownInfo {
pub order_by: Vec<(RemoteExpr<String>, bool, bool)>,
}

/// TopK is a wrapper for topk push down items.
/// We only take the first column in order_by as the topk column.
#[derive(Debug, Clone)]
pub struct TopK {
pub limit: usize,
pub order_by: TableField,
pub asc: bool,
pub column_id: u32,
}

impl PushDownInfo {
pub fn top_k(&self, schema: &TableSchema, support: fn(&DataType) -> bool) -> Option<TopK> {
if !self.order_by.is_empty() && self.limit.is_some() {
let order = &self.order_by[0];
let limit = self.limit.unwrap();

const MAX_TOPK_LIMIT: usize = 1000;
if limit > MAX_TOPK_LIMIT {
return None;
}

if let RemoteExpr::<String>::ColumnRef { id, .. } = &order.0 {
let field = schema.field_with_name(id).unwrap();
let data_type: DataType = field.data_type().into();
if !support(&data_type) {
return None;
}

let leaf_fields = schema.leaf_fields();
let column_id = leaf_fields.iter().position(|p| p == field).unwrap();

let top_k = TopK {
limit: self.limit.unwrap(),
order_by: field.clone(),
asc: order.1,
column_id: column_id as u32,
};
Some(top_k)
} else {
None
}
} else {
None
}
}

pub fn prewhere_of_push_downs(push_downs: &Option<PushDownInfo>) -> Option<PrewhereInfo> {
if let Some(PushDownInfo { prewhere, .. }) = push_downs {
prewhere.clone()
Expand Down
104 changes: 16 additions & 88 deletions src/query/expression/src/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,124 +20,39 @@ use common_arrow::arrow::buffer::Buffer;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::filter_helper::FilterHelpers;
use crate::types::array::ArrayColumnBuilder;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::number::NumberScalar;
use crate::types::string::StringColumnBuilder;
use crate::types::AnyType;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BooleanType;
use crate::types::StringType;
use crate::types::ValueType;
use crate::types::VariantType;
use crate::with_number_mapped_type;
use crate::with_number_type;
use crate::BlockEntry;
use crate::Column;
use crate::ColumnBuilder;
use crate::DataBlock;
use crate::Scalar;
use crate::TypeDeserializer;
use crate::Value;

impl DataBlock {
// check if the predicate has any valid row
pub fn filter_exists(predicate: &Value<AnyType>) -> Result<bool> {
let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Filter predict column does not support type '{:?}'",
predicate
))
})?;
match predicate {
Value::Scalar(s) => Ok(s),
Value::Column(bitmap) => Ok(bitmap.len() != bitmap.unset_bits()),
}
}

pub fn filter(self, predicate: &Value<AnyType>) -> Result<DataBlock> {
if self.num_rows() == 0 {
return Ok(self);
}

let predicate = Self::cast_to_nonull_boolean(predicate).ok_or_else(|| {
let predicate = FilterHelpers::cast_to_nonull_boolean(predicate).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Filter predict column does not support type '{:?}'",
predicate
))
})?;

match predicate {
Value::Scalar(s) => {
if s {
Ok(self)
} else {
Ok(self.slice(0..0))
}
}
Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap),
}
}

// Must be numeric, boolean, or string value type
pub fn cast_to_nonull_boolean(predicate: &Value<AnyType>) -> Option<Value<BooleanType>> {
match predicate {
Value::Scalar(s) => Self::cast_scalar_to_boolean(s).map(Value::Scalar),
Value::Column(c) => Self::cast_column_to_boolean(c).map(Value::Column),
}
}

fn cast_scalar_to_boolean(s: &Scalar) -> Option<bool> {
match s {
Scalar::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num {
NumberScalar::SRC_TYPE(value) => Some(value != &SRC_TYPE::default()),
}),
Scalar::Boolean(value) => Some(*value),
Scalar::String(value) => Some(!value.is_empty()),
Scalar::Timestamp(value) => Some(*value != 0),
Scalar::Date(value) => Some(*value != 0),
Scalar::Null => Some(false),
_ => None,
}
}

fn cast_column_to_boolean(c: &Column) -> Option<Bitmap> {
match c {
Column::Number(num) => with_number_mapped_type!(|SRC_TYPE| match num {
NumberColumn::SRC_TYPE(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| v != &SRC_TYPE::default()),
&[],
)),
}),
Column::Boolean(value) => Some(value.clone()),
Column::String(value) => Some(BooleanType::column_from_iter(
value.iter().map(|s| !s.is_empty()),
&[],
)),
Column::Timestamp(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| *v != 0),
&[],
)),
Column::Date(value) => Some(BooleanType::column_from_iter(
value.iter().map(|v| *v != 0),
&[],
)),
Column::Null { len } => Some(MutableBitmap::from_len_zeroed(*len).into()),
Column::Nullable(c) => {
let inner = Self::cast_column_to_boolean(&c.column)?;
Some((&inner) & (&c.validity))
}
_ => None,
}
}

pub fn try_as_const_bool(value: &Value<BooleanType>) -> Result<Option<bool>> {
match value {
Value::Scalar(v) => Ok(Some(*v)),
_ => Ok(None),
}
self.filter_boolean_value(predicate)
}

pub fn filter_with_bitmap(block: DataBlock, bitmap: &Bitmap) -> Result<DataBlock> {
Expand Down Expand Up @@ -169,6 +84,19 @@ impl DataBlock {
}
}
}

pub fn filter_boolean_value(self, filter: Value<BooleanType>) -> Result<DataBlock> {
match filter {
Value::Scalar(s) => {
if s {
Ok(self)
} else {
Ok(self.slice(0..0))
}
}
Value::Column(bitmap) => Self::filter_with_bitmap(self, &bitmap),
}
}
}

impl Column {
Expand Down
2 changes: 2 additions & 0 deletions src/query/expression/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ mod scatter;
mod sort;
mod take;
mod take_chunks;
mod topk;

pub use group_by::*;
pub use group_by_hash::*;
pub use sort::*;
pub use take_chunks::*;
pub use topk::*;
Loading