Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: track memory usage for recursive CTE, enable recursive CTEs by default #9619

Merged
merged 8 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,7 @@ config_namespace! {
pub listing_table_ignore_subdirectory: bool, default = true

/// Should DataFusion support recursive CTEs
/// Defaults to false since this feature is a work in progress and may not
/// behave as expected
pub enable_recursive_ctes: bool, default = false
pub enable_recursive_ctes: bool, default = true
}
}

Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,28 @@ async fn sort_spill_reservation() {
test.with_config(config).with_expected_success().run().await;
}

#[tokio::test]
async fn oom_recursive_cte() {
TestCase::new()
.with_query(
"WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT UNNEST(RANGE(id+1, id+1000)) as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes;",
)
.with_expected_errors(vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"Resources exhausted: Failed to allocate additional",
"RecursiveQuery",
])
.with_memory_limit(2_000)
.run()
.await
}

/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, project_schema, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

Expand Down Expand Up @@ -236,6 +237,8 @@ impl MemoryExec {
pub struct MemoryStream {
/// Vector of record batches
data: Vec<RecordBatch>,
/// Optional memory reservation bound to the data, freed on drop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little worried at first that this optional API makes it easy to forget to provide reservation. However I see now that the reservation is used only with the recursive CTE case

(not for this PR) In general I wondered if we should always have a memory reservation to MemoryStream 🤔 I think that would double count batches from a MemTable however, so it isn't an obviously good improvement

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This concern is quite reasonable; previously there was no need to add a MemoryReservation to the MemoryStream, and the with_reservation function is a bit confused.

I plan to no longer use MemoryStream for future recursive CTEs and instead use a new stream class that can read the WorkTable multiple times, in order to support
https://github.com/apache/arrow-datafusion/blob/81b0a011705b17a09f494f550a5382b0c3414597/datafusion/physical-plan/src/recursive_query.rs#L316

So that the MemoryStream::with_reservation API can be removed.

reservation: Option<MemoryReservation>,
/// Schema representing the data
schema: SchemaRef,
/// Optional projection for which columns to load
Expand All @@ -253,11 +256,18 @@ impl MemoryStream {
) -> Result<Self> {
Ok(Self {
data,
reservation: None,
schema,
projection,
index: 0,
})
}

/// Set the memory reservation for the data
pub(super) fn with_reservation(mut self, reservation: MemoryReservation) -> Self {
self.reservation = Some(reservation);
self
}
}

impl Stream for MemoryStream {
Expand Down
15 changes: 13 additions & 2 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};

Expand Down Expand Up @@ -236,6 +237,8 @@ struct RecursiveQueryStream {
/// In-memory buffer for storing a copy of the current results. Will be
/// cleared after each iteration.
buffer: Vec<RecordBatch>,
/// Tracks the memory used by the buffer
reservation: MemoryReservation,
// /// Metrics.
_baseline_metrics: BaselineMetrics,
}
Expand All @@ -250,6 +253,8 @@ impl RecursiveQueryStream {
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = static_stream.schema();
let reservation =
MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool());
Self {
task_context,
work_table,
Expand All @@ -258,6 +263,7 @@ impl RecursiveQueryStream {
recursive_stream: None,
schema,
buffer: vec![],
reservation,
_baseline_metrics: baseline_metrics,
}
}
Expand All @@ -268,6 +274,10 @@ impl RecursiveQueryStream {
mut self: std::pin::Pin<&mut Self>,
batch: RecordBatch,
) -> Poll<Option<Result<RecordBatch>>> {
if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) {
return Poll::Ready(Some(Err(e)));
}

self.buffer.push(batch.clone());
Poll::Ready(Some(Ok(batch)))
}
Expand All @@ -289,8 +299,9 @@ impl RecursiveQueryStream {
}

// Update the work table with the current buffer
let batches = self.buffer.drain(..).collect();
self.work_table.write(batches);
let batches = std::mem::take(&mut self.buffer);
let reservation = self.reservation.take();
self.work_table.update(batches, reservation);

// We always execute (and re-execute iteratively) the first partition.
// Downstream plans should not expect any partitioning.
Expand Down
93 changes: 77 additions & 16 deletions datafusion/physical-plan/src/work_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,34 @@ use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProp

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, Result};
use datafusion_common::{internal_datafusion_err, internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};

/// A vector of record batches with a memory reservation.
#[derive(Debug)]
struct ReservedBatches {
batches: Vec<RecordBatch>,
#[allow(dead_code)]
reservation: MemoryReservation,
}

impl ReservedBatches {
fn new(batches: Vec<RecordBatch>, reservation: MemoryReservation) -> Self {
ReservedBatches {
batches,
reservation,
}
}
}

/// The name is from PostgreSQL's terminology.
/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
/// This table serves as a mirror or buffer between each iteration of a recursive query.
#[derive(Debug)]
pub(super) struct WorkTable {
batches: Mutex<Option<Vec<RecordBatch>>>,
batches: Mutex<Option<ReservedBatches>>,
}

impl WorkTable {
Expand All @@ -51,14 +69,22 @@ impl WorkTable {

/// Take the previously written batches from the work table.
/// This will be called by the [`WorkTableExec`] when it is executed.
fn take(&self) -> Vec<RecordBatch> {
let batches = self.batches.lock().unwrap().take().unwrap_or_default();
batches
fn take(&self) -> Result<ReservedBatches> {
self.batches
.lock()
.unwrap()
.take()
.ok_or_else(|| internal_datafusion_err!("Unexpected empty work table"))
}

/// Write the results of a recursive query iteration to the work table.
pub(super) fn write(&self, input: Vec<RecordBatch>) {
self.batches.lock().unwrap().replace(input);
/// Update the results of a recursive query iteration to the work table.
pub(super) fn update(
&self,
batches: Vec<RecordBatch>,
reservation: MemoryReservation,
) {
let reserved_batches = ReservedBatches::new(batches, reservation);
self.batches.lock().unwrap().replace(reserved_batches);
}
}

Expand Down Expand Up @@ -175,13 +201,11 @@ impl ExecutionPlan for WorkTableExec {
"WorkTableExec got an invalid partition {partition} (expected 0)"
);
}

let batches = self.work_table.take();
Ok(Box::pin(MemoryStream::try_new(
batches,
self.schema.clone(),
None,
)?))
let batch = self.work_table.take()?;
Ok(Box::pin(
MemoryStream::try_new(batch.batches, self.schema.clone(), None)?
.with_reservation(batch.reservation),
))
}

fn metrics(&self) -> Option<MetricsSet> {
Expand All @@ -194,4 +218,41 @@ impl ExecutionPlan for WorkTableExec {
}

#[cfg(test)]
mod tests {}
mod tests {
use super::*;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool};
use std::sync::Arc;

#[test]
fn test_work_table() {
let work_table = WorkTable::new();
// cann't take from empty work_table
assert!(work_table.take().is_err());

let pool = Arc::new(UnboundedMemoryPool::default()) as _;
let mut reservation = MemoryConsumer::new("test_work_table").register(&pool);

// update batch to work_table
let array: ArrayRef = Arc::new((0..5).collect::<Int32Array>());
let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap();
reservation.try_grow(100).unwrap();
work_table.update(vec![batch.clone()], reservation);
// take from work_table
let reserved_batches = work_table.take().unwrap();
assert_eq!(reserved_batches.batches, vec![batch.clone()]);

// consume the batch by the MemoryStream
let memory_stream =
MemoryStream::try_new(reserved_batches.batches, batch.schema(), None)
.unwrap()
.with_reservation(reserved_batches.reservation);

// should still be reserved
assert_eq!(pool.reserved(), 100);

// the reservation should be freed after drop the memory_stream
drop(memory_stream);
assert_eq!(pool.reserved(), 0);
}
}
38 changes: 18 additions & 20 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,15 +1387,21 @@ fn recursive_ctes() {
select n + 1 FROM numbers WHERE N < 10
)
select * from numbers;";
let err = logical_plan(sql).expect_err("query should have failed");
assert_eq!(
"This feature is not implemented: Recursive CTEs are not enabled",
err.strip_backtrace()
);
quick_test(
sql,
"Projection: numbers.n\
\n SubqueryAlias: numbers\
\n RecursiveQuery: is_distinct=false\
\n Projection: Int64(1) AS n\
\n EmptyRelation\
\n Projection: numbers.n + Int64(1)\
\n Filter: numbers.n < Int64(10)\
\n TableScan: numbers",
)
}

#[test]
fn recursive_ctes_enabled() {
fn recursive_ctes_disabled() {
let sql = "
WITH RECURSIVE numbers AS (
select 1 as n
Expand All @@ -1404,28 +1410,20 @@ fn recursive_ctes_enabled() {
)
select * from numbers;";

// manually setting up test here so that we can enable recursive ctes
// manually setting up test here so that we can disable recursive ctes
let mut context = MockContextProvider::default();
context.options_mut().execution.enable_recursive_ctes = true;
context.options_mut().execution.enable_recursive_ctes = false;

let planner = SqlToRel::new_with_options(&context, ParserOptions::default());
let result = DFParser::parse_sql_with_dialect(sql, &GenericDialect {});
let mut ast = result.unwrap();

let plan = planner
let err = planner
.statement_to_plan(ast.pop_front().unwrap())
.expect("recursive cte plan creation failed");

.expect_err("query should have failed");
assert_eq!(
format!("{plan:?}"),
"Projection: numbers.n\
\n SubqueryAlias: numbers\
\n RecursiveQuery: is_distinct=false\
\n Projection: Int64(1) AS n\
\n EmptyRelation\
\n Projection: numbers.n + Int64(1)\
\n Filter: numbers.n < Int64(10)\
\n TableScan: numbers"
"This feature is not implemented: Recursive CTEs are not enabled",
err.strip_backtrace()
);
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.collect_statistics false
datafusion.execution.enable_recursive_ctes false
datafusion.execution.enable_recursive_ctes true
datafusion.execution.listing_table_ignore_subdirectory true
datafusion.execution.max_buffered_batches_per_output_file 2
datafusion.execution.meta_fetch_concurrency 32
Expand Down Expand Up @@ -244,7 +244,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f
datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting
datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files
datafusion.execution.enable_recursive_ctes false Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected
datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs
datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption
datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max |
| datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption |
| datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). |
| datafusion.execution.enable_recursive_ctes | false | Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected |
| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs |
| datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. |
| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores |
| datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |
Expand Down
Loading