Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use temp table to refactor materialized cte #16900

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Scalar;
Expand Down Expand Up @@ -78,8 +77,6 @@ use crate::runtime_filter_info::RuntimeFilterReady;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;

pub type MaterializedCtesBlocks = Arc<RwLock<HashMap<(usize, usize), Arc<RwLock<Vec<DataBlock>>>>>>;

pub struct ContextError;

#[derive(Debug)]
Expand Down Expand Up @@ -264,6 +261,8 @@ pub trait TableContext: Send + Sync {
async fn get_table(&self, catalog: &str, database: &str, table: &str)
-> Result<Arc<dyn Table>>;

fn remove_table_from_cache(&self, catalog: &str, database: &str, table: &str);

async fn get_table_with_batch(
&self,
catalog: &str,
Expand All @@ -281,19 +280,6 @@ pub trait TableContext: Send + Sync {
max_files: Option<usize>,
) -> Result<FilteredCopyFiles>;

fn set_materialized_cte(
&self,
idx: (usize, usize),
mem_table: Arc<RwLock<Vec<DataBlock>>>,
) -> Result<()>;

fn get_materialized_cte(
&self,
idx: (usize, usize),
) -> Result<Option<Arc<RwLock<Vec<DataBlock>>>>>;

fn get_materialized_ctes(&self) -> MaterializedCtesBlocks;

fn add_segment_location(&self, segment_loc: Location) -> Result<()>;

fn clear_segment_locations(&self) -> Result<()>;
Expand Down
4 changes: 1 addition & 3 deletions src/query/pipeline/sources/src/async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
match self.inner.generate().await? {
None => self.is_finish = true,
Some(data_block) => {
// Don't need to record the scan progress of `MaterializedCteSource`
// Because it reads data from memory.
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
if !data_block.is_empty() {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
4 changes: 1 addition & 3 deletions src/query/pipeline/sources/src/prefetch_async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ impl<T: 'static + PrefetchAsyncSource> Processor for PrefetchAsyncSourcer<T> {
match self.inner.generate().await? {
None => self.is_inner_finish = true,
Some(data_block) => {
// Don't need to record the scan progress of `MaterializedCteSource`
// Because it reads data from memory.
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
if !data_block.is_empty() {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
73 changes: 0 additions & 73 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@ use databend_common_exception::Result;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::Sinker;
use databend_common_sql::executor::physical_plans::HashJoin;
use databend_common_sql::executor::physical_plans::MaterializedCte;
use databend_common_sql::executor::physical_plans::RangeJoin;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::ColumnBinding;
use databend_common_sql::IndexType;

use crate::pipelines::processors::transforms::range_join::RangeJoinState;
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft;
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::HashJoinProbeState;
use crate::pipelines::processors::transforms::MaterializedCteSink;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
use crate::pipelines::processors::HashJoinDesc;
Expand Down Expand Up @@ -77,8 +72,6 @@ impl PipelineBuilder {
right_side_context,
self.main_pipeline.get_scopes(),
);
right_side_builder.cte_state = self.cte_state.clone();
right_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
right_side_builder.hash_join_states = self.hash_join_states.clone();

let mut right_res = right_side_builder.finalize(&range_join.right)?;
Expand Down Expand Up @@ -148,8 +141,6 @@ impl PipelineBuilder {
build_side_context,
self.main_pipeline.get_scopes(),
);
build_side_builder.cte_state = self.cte_state.clone();
build_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
build_side_builder.hash_join_states = self.hash_join_states.clone();
let mut build_res = build_side_builder.finalize(build)?;

Expand Down Expand Up @@ -224,68 +215,4 @@ impl PipelineBuilder {

Ok(())
}

pub(crate) fn build_materialized_cte(
&mut self,
materialized_cte: &MaterializedCte,
) -> Result<()> {
self.cte_scan_offsets.insert(
materialized_cte.cte_idx,
materialized_cte.cte_scan_offset.clone(),
);
self.expand_materialized_side_pipeline(
&materialized_cte.right,
materialized_cte.cte_idx,
&materialized_cte.materialized_output_columns,
)?;
self.build_pipeline(&materialized_cte.left)
}

fn expand_materialized_side_pipeline(
&mut self,
materialized_side: &PhysicalPlan,
cte_idx: IndexType,
materialized_output_columns: &[ColumnBinding],
) -> Result<()> {
let materialized_side_ctx = QueryContext::create_from(self.ctx.clone());
let state = Arc::new(MaterializedCteState::new(self.ctx.clone()));
self.cte_state.insert(cte_idx, state.clone());
let mut materialized_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
materialized_side_ctx,
self.main_pipeline.get_scopes(),
);
materialized_side_builder.cte_state = self.cte_state.clone();
materialized_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
materialized_side_builder.hash_join_states = self.hash_join_states.clone();
let mut materialized_side_pipeline =
materialized_side_builder.finalize(materialized_side)?;
assert!(
materialized_side_pipeline
.main_pipeline
.is_pulling_pipeline()?
);

PipelineBuilder::build_result_projection(
&self.func_ctx,
materialized_side.output_schema()?,
materialized_output_columns,
&mut materialized_side_pipeline.main_pipeline,
false,
)?;

materialized_side_pipeline.main_pipeline.add_sink(|input| {
let transform = Sinker::<MaterializedCteSink>::create(
input,
MaterializedCteSink::create(self.ctx.clone(), cte_idx, state.clone())?,
);
Ok(ProcessorPtr::create(transform))
})?;
self.pipelines
.push(materialized_side_pipeline.main_pipeline.finalize());
self.pipelines
.extend(materialized_side_pipeline.sources_pipelines);
Ok(())
}
}
22 changes: 0 additions & 22 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::CacheScan;
use databend_common_sql::executor::physical_plans::ConstantTableScan;
use databend_common_sql::executor::physical_plans::CteScan;
use databend_common_sql::executor::physical_plans::ExpressionScan;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::plans::CacheSource;

use crate::pipelines::processors::transforms::CacheSourceState;
use crate::pipelines::processors::transforms::HashJoinCacheState;
use crate::pipelines::processors::transforms::MaterializedCteSource;
use crate::pipelines::processors::transforms::TransformAddInternalColumns;
use crate::pipelines::processors::transforms::TransformCacheScan;
use crate::pipelines::processors::transforms::TransformExpressionScan;
Expand Down Expand Up @@ -76,26 +74,6 @@ impl PipelineBuilder {
Ok(())
}

pub(crate) fn build_cte_scan(&mut self, cte_scan: &CteScan) -> Result<()> {
let max_threads = self.settings.get_max_threads()?;
self.main_pipeline.add_source(
|output| {
MaterializedCteSource::create(
self.ctx.clone(),
output,
cte_scan.cte_idx,
self.cte_state.get(&cte_scan.cte_idx.0).unwrap().clone(),
cte_scan.offsets.clone(),
self.cte_scan_offsets
.get(&cte_scan.cte_idx.0)
.unwrap()
.clone(),
)
},
max_threads as usize,
)
}

pub(crate) fn build_constant_table_scan(&mut self, scan: &ConstantTableScan) -> Result<()> {
self.main_pipeline.add_source(
|output| {
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/builders/builder_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ impl PipelineBuilder {
union_ctx,
self.main_pipeline.get_scopes(),
);
pipeline_builder.cte_state = self.cte_state.clone();
pipeline_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
pipeline_builder.hash_join_states = self.hash_join_states.clone();

let mut build_res = pipeline_builder.finalize(input)?;
Expand Down
13 changes: 0 additions & 13 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ use databend_common_pipeline_core::processors::PlanScopeGuard;
use databend_common_pipeline_core::Pipeline;
use databend_common_settings::Settings;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::IndexType;

use super::PipelineBuilderData;
use crate::interpreters::CreateTableInterpreter;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::HashJoinState;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::exchange::DefaultExchangeInjector;
Expand All @@ -49,11 +47,6 @@ pub struct PipelineBuilder {
pub merge_into_probe_data_fields: Option<Vec<DataField>>,
pub join_state: Option<Arc<HashJoinBuildState>>,

// The cte state of each materialized cte.
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
// The column offsets used by cte scan
pub cte_scan_offsets: HashMap<IndexType, Vec<usize>>,

pub(crate) exchange_injector: Arc<dyn ExchangeInjector>,

pub hash_join_states: HashMap<usize, Arc<HashJoinState>>,
Expand All @@ -78,8 +71,6 @@ impl PipelineBuilder {
pipelines: vec![],
main_pipeline: Pipeline::with_scopes(scopes),
exchange_injector: DefaultExchangeInjector::create(),
cte_state: HashMap::new(),
cte_scan_offsets: HashMap::new(),
merge_into_probe_data_fields: None,
join_state: None,
hash_join_states: HashMap::new(),
Expand Down Expand Up @@ -162,7 +153,6 @@ impl PipelineBuilder {

match plan {
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan),
PhysicalPlan::ConstantTableScan(scan) => self.build_constant_table_scan(scan),
PhysicalPlan::Filter(filter) => self.build_filter(filter),
PhysicalPlan::EvalScalar(eval_scalar) => self.build_eval_scalar(eval_scalar),
Expand All @@ -189,9 +179,6 @@ impl PipelineBuilder {
"Invalid physical plan with PhysicalPlan::Exchange",
)),
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::MaterializedCte(materialized_cte) => {
self.build_materialized_cte(materialized_cte)
}
PhysicalPlan::CacheScan(cache_scan) => self.build_cache_scan(cache_scan),
PhysicalPlan::ExpressionScan(expression_scan) => {
self.build_expression_scan(expression_scan)
Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ mod transform_dictionary;
mod transform_expression_scan;
mod transform_filter;
mod transform_limit;
mod transform_materialized_cte;
mod transform_merge_block;
mod transform_null_if;
mod transform_recursive_cte_scan;
Expand All @@ -55,9 +54,6 @@ pub use transform_create_sets::TransformCreateSets;
pub use transform_expression_scan::TransformExpressionScan;
pub use transform_filter::TransformFilter;
pub use transform_limit::TransformLimit;
pub use transform_materialized_cte::MaterializedCteSink;
pub use transform_materialized_cte::MaterializedCteSource;
pub use transform_materialized_cte::MaterializedCteState;
pub use transform_merge_block::TransformMergeBlock;
pub use transform_null_if::TransformNullIf;
pub use transform_recursive_cte_scan::TransformRecursiveCteScan;
Expand Down
Loading
Loading