From 2201ec32aba2af7fb003457e88c056ed1674fc98 Mon Sep 17 00:00:00 2001 From: Eduard Karacharov Date: Fri, 3 Mar 2023 11:12:46 +0300 Subject: [PATCH] memory limited hash join --- datafusion/core/src/physical_plan/common.rs | 6 + .../src/physical_plan/joins/cross_join.rs | 36 +- .../core/src/physical_plan/joins/hash_join.rs | 379 ++++++++++++------ datafusion/core/tests/memory_limit.rs | 10 + 4 files changed, 289 insertions(+), 142 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 68f8882102f5..2f02aaa272c8 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -39,8 +39,14 @@ use std::task::{Context, Poll}; use tokio::sync::mpsc; use tokio::task::JoinHandle; +/// [`MemoryReservation`] used across query execution streams pub(crate) type SharedMemoryReservation = Arc>; +/// [`MemoryReservation`] used at query operator level +/// `Option` wrapper allows to initialize empty reservation in operator constructor, +/// and set it to actual reservation at stream level. +pub(crate) type OperatorMemoryReservation = Arc>>; + /// Stream of record batches pub struct SizedRecordBatchStream { schema: SchemaRef, diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 0523a2e35d25..d4933b9d6e0e 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch; use crate::execution::context::TaskContext; use crate::execution::memory_pool::MemoryConsumer; -use crate::physical_plan::common::SharedMemoryReservation; +use crate::physical_plan::common::{OperatorMemoryReservation, SharedMemoryReservation}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, @@ -60,6 +60,8 @@ pub struct CrossJoinExec { schema: SchemaRef, /// Build-side data left_fut: OnceAsync, + /// Memory reservation for build-side data + reservation: OperatorMemoryReservation, /// Execution plan metrics metrics: ExecutionPlanMetricsSet, } @@ -83,6 +85,7 @@ impl CrossJoinExec { right, schema, left_fut: Default::default(), + reservation: Default::default(), metrics: ExecutionPlanMetricsSet::default(), } } @@ -221,17 +224,29 @@ impl ExecutionPlan for CrossJoinExec { let stream = self.right.execute(partition, context.clone())?; let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); - let reservation = Arc::new(Mutex::new( - MemoryConsumer::new(format!("CrossJoinStream[{partition}]")) - .register(context.memory_pool()), - )); + + // Initialization of operator-level reservation + { + let mut reservation_lock = self.reservation.lock(); + if reservation_lock.is_none() { + *reservation_lock = Some(Arc::new(Mutex::new( + MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()), + ))); + }; + } + + let reservation = self.reservation.lock().clone().ok_or_else(|| { + DataFusionError::Internal( + "Operator-level memory reservation is not initialized".to_string(), + ) + })?; let left_fut = self.left_fut.once(|| { load_left_input( self.left.clone(), context, join_metrics.clone(), - reservation.clone(), + reservation, ) }); @@ -242,7 +257,6 @@ impl ExecutionPlan for CrossJoinExec { right_batch: Arc::new(parking_lot::Mutex::new(None)), left_index: 0, join_metrics, - reservation, })) } @@ -346,8 +360,6 @@ struct CrossJoinStream { right_batch: Arc>>, /// join execution metrics join_metrics: BuildProbeJoinMetrics, - /// memory reservation - reservation: SharedMemoryReservation, } impl RecordBatchStream for CrossJoinStream { @@ -452,10 +464,7 @@ impl CrossJoinStream { Some(result) } - other => { - self.reservation.lock().free(); - other - } + other => other, }) } } @@ -683,6 +692,7 @@ mod tests { err.to_string(), "External error: Resources exhausted: Failed to allocate additional" ); + assert_contains!(err.to_string(), "CrossJoinExec"); Ok(()) } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 7752e1ebf4e3..ba703122933b 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -32,11 +32,11 @@ use arrow::{ Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }, + util::bit_util, }; use smallvec::{smallvec, SmallVec}; use std::sync::Arc; -use std::{any::Any, usize}; -use std::{time::Instant, vec}; +use std::{any::Any, usize, vec}; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -58,15 +58,17 @@ use hashbrown::raw::RawTable; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, + common::{OperatorMemoryReservation, SharedMemoryReservation}, expressions::Column, expressions::PhysicalSortExpr, hash_utils::create_hashes, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, combine_join_equivalence_properties, estimate_join_statistics, - partitioned_join_output_partitioning, ColumnIndex, JoinFilter, JoinOn, + partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex, + JoinFilter, JoinOn, }, - metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, + metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; @@ -76,7 +78,7 @@ use crate::logical_expr::JoinType; use crate::arrow::array::BooleanBufferBuilder; use crate::arrow::datatypes::TimeUnit; -use crate::execution::context::TaskContext; +use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer}; use super::{ utils::{OnceAsync, OnceFut}, @@ -86,7 +88,7 @@ use crate::physical_plan::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide, }; -use log::debug; +use parking_lot::Mutex; use std::fmt; use std::task::Poll; @@ -134,6 +136,8 @@ pub struct HashJoinExec { schema: SchemaRef, /// Build-side data left_fut: OnceAsync, + /// Operator-level memory reservation for left data + reservation: OperatorMemoryReservation, /// Shares the `RandomState` for the hashing algorithm random_state: RandomState, /// Partitioning mode to use @@ -146,50 +150,6 @@ pub struct HashJoinExec { pub(crate) null_equals_null: bool, } -/// Metrics for HashJoinExec -#[derive(Debug)] -struct HashJoinMetrics { - /// Total time for joining probe-side batches to the build-side batches - probe_time: metrics::Time, - /// Total time for building hashmap - build_time: metrics::Time, - /// Number of batches consumed by this operator - input_batches: metrics::Count, - /// Number of rows consumed by this operator - input_rows: metrics::Count, - /// Number of batches produced by this operator - output_batches: metrics::Count, - /// Number of rows produced by this operator - output_rows: metrics::Count, -} - -impl HashJoinMetrics { - pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self { - let probe_time = MetricBuilder::new(metrics).subset_time("probe_time", partition); - - let build_time = MetricBuilder::new(metrics).subset_time("build_time", partition); - - let input_batches = - MetricBuilder::new(metrics).counter("input_batches", partition); - - let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - - let output_batches = - MetricBuilder::new(metrics).counter("output_batches", partition); - - let output_rows = MetricBuilder::new(metrics).output_rows(partition); - - Self { - probe_time, - build_time, - input_batches, - input_rows, - output_batches, - output_rows, - } - } -} - impl HashJoinExec { /// Tries to create a new [HashJoinExec]. /// # Error @@ -226,6 +186,7 @@ impl HashJoinExec { join_type: *join_type, schema: Arc::new(schema), left_fut: Default::default(), + reservation: Default::default(), random_state, mode: partition_mode, metrics: ExecutionPlanMetricsSet::new(), @@ -414,21 +375,57 @@ impl ExecutionPlan for HashJoinExec { let on_left = self.on.iter().map(|on| on.0.clone()).collect::>(); let on_right = self.on.iter().map(|on| on.1.clone()).collect::>(); + let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); + + // Initialization of operator-level reservation + { + let mut operator_reservation_lock = self.reservation.lock(); + if operator_reservation_lock.is_none() { + *operator_reservation_lock = Some(Arc::new(Mutex::new( + MemoryConsumer::new("HashJoinExec").register(context.memory_pool()), + ))); + }; + } + + let operator_reservation = self.reservation.lock().clone().ok_or_else(|| { + DataFusionError::Internal( + "Operator-level memory reservation is not initialized".to_string(), + ) + })?; + + // Inititalization of stream-level reservation + let reservation = Arc::new(Mutex::new( + MemoryConsumer::new(format!("HashJoinStream[{partition}]")) + .register(context.memory_pool()), + )); + + // Memory reservation for left-side data depends on PartitionMode: + // - operator-level for `CollectLeft` mode + // - stream-level for partitioned mode + // + // This approach allows to avoid cases when left data could potentially + // outlive its memory reservation and rely on `MemoryReservation` destructors + // for releasing memory in pool. let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.once(|| { collect_left_input( + None, self.random_state.clone(), self.left.clone(), on_left.clone(), context.clone(), + join_metrics.clone(), + operator_reservation.clone(), ) }), - PartitionMode::Partitioned => OnceFut::new(partitioned_left_input( - partition, + PartitionMode::Partitioned => OnceFut::new(collect_left_input( + Some(partition), self.random_state.clone(), self.left.clone(), on_left.clone(), context.clone(), + join_metrics.clone(), + reservation.clone(), )), PartitionMode::Auto => { return Err(DataFusionError::Plan(format!( @@ -453,9 +450,10 @@ impl ExecutionPlan for HashJoinExec { right: right_stream, column_indices: self.column_indices.clone(), random_state: self.random_state.clone(), - join_metrics: HashJoinMetrics::new(partition, &self.metrics), + join_metrics, null_equals_null: self.null_equals_null, is_exhausted: false, + reservation, })) } @@ -493,90 +491,73 @@ impl ExecutionPlan for HashJoinExec { } async fn collect_left_input( + partition: Option, random_state: RandomState, left: Arc, on_left: Vec, context: Arc, + metrics: BuildProbeJoinMetrics, + reservation: SharedMemoryReservation, ) -> Result { let schema = left.schema(); - let start = Instant::now(); - // merge all left parts into a single stream - let merge = { - if left.output_partitioning().partition_count() != 1 { - Arc::new(CoalescePartitionsExec::new(left)) - } else { - left - } - }; - let stream = merge.execute(0, context)?; - - // This operation performs 2 steps at once: - // 1. creates a [JoinHashMap] of all batches from the stream - // 2. stores the batches in a vector. - let initial = (0, Vec::new()); - let (num_rows, batches) = stream - .try_fold(initial, |mut acc, batch| async { - acc.0 += batch.num_rows(); - acc.1.push(batch); - Ok(acc) - }) - .await?; - let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows)); - let mut hashes_buffer = Vec::new(); - let mut offset = 0; - for batch in batches.iter() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( - &on_left, - batch, - &mut hashmap, - offset, - &random_state, - &mut hashes_buffer, - )?; - offset += batch.num_rows(); - } - // Merge all batches into a single batch, so we - // can directly index into the arrays - let single_batch = concat_batches(&schema, &batches, num_rows)?; - - debug!( - "Built build-side of hash join containing {} rows in {} ms", - num_rows, - start.elapsed().as_millis() - ); - - Ok((hashmap, single_batch)) -} - -async fn partitioned_left_input( - partition: usize, - random_state: RandomState, - left: Arc, - on_left: Vec, - context: Arc, -) -> Result { - let schema = left.schema(); + let (left_input, left_input_partition) = if let Some(partition) = partition { + (left, partition) + } else { + let merge = { + if left.output_partitioning().partition_count() != 1 { + Arc::new(CoalescePartitionsExec::new(left)) + } else { + left + } + }; - let start = Instant::now(); + (merge, 0) + }; - // Load 1 partition of left side in memory - let stream = left.execute(partition, context.clone())?; + // Depending on partition argument load single partition or whole left side in memory + let stream = left_input.execute(left_input_partition, context.clone())?; // This operation performs 2 steps at once: // 1. creates a [JoinHashMap] of all batches from the stream // 2. stores the batches in a vector. - let initial = (0, Vec::new()); - let (num_rows, batches) = stream + let initial = (Vec::new(), 0, metrics, reservation); + let (batches, num_rows, metrics, reservation) = stream .try_fold(initial, |mut acc, batch| async { - acc.0 += batch.num_rows(); - acc.1.push(batch); + let batch_size = batch.get_array_memory_size(); + // Reserve memory for incoming batch + acc.3.lock().try_grow(batch_size)?; + // Update metrics + acc.2.build_mem_used.add(batch_size); + acc.2.build_input_batches.add(1); + acc.2.build_input_rows.add(batch.num_rows()); + // Update rowcount + acc.1 += batch.num_rows(); + // Push batch to output + acc.0.push(batch); Ok(acc) }) .await?; + // Estimation of memory size, required for hashtable, prior to allocation. + // Final result can be verified using `RawTable.allocation_info()` + // + // For majority of cases hashbrown overestimates buckets qty to keep ~1/8 of them empty. + // This formula leads to overallocation for small tables (< 8 elements) but fine overall. + let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| { + DataFusionError::Execution( + "usize overflow while estimating number of hasmap buckets".to_string(), + ) + })? / 7) + .next_power_of_two(); + // 32 bytes per `(u64, SmallVec<[u64; 1]>)` + // + 1 byte for each bucket + // + 16 bytes fixed + let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16; + + reservation.lock().try_grow(estimated_hastable_size)?; + metrics.build_mem_used.add(estimated_hastable_size); + let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows)); let mut hashes_buffer = Vec::new(); let mut offset = 0; @@ -597,13 +578,6 @@ async fn partitioned_left_input( // can directly index into the arrays let single_batch = concat_batches(&schema, &batches, num_rows)?; - debug!( - "Built build-side {} of hash join containing {} rows in {} ms", - partition, - num_rows, - start.elapsed().as_millis() - ); - Ok((hashmap, single_batch)) } @@ -667,11 +641,13 @@ struct HashJoinStream { /// There is nothing to process anymore and left side is processed in case of left join is_exhausted: bool, /// Metrics - join_metrics: HashJoinMetrics, + join_metrics: BuildProbeJoinMetrics, /// Information of index and left / right placement of columns column_indices: Vec, /// If null_equals_null is true, null == null else null != null null_equals_null: bool, + /// Memory reservation + reservation: SharedMemoryReservation, } impl RecordBatchStream for HashJoinStream { @@ -1173,6 +1149,18 @@ impl HashJoinStream { }; build_timer.done(); + // Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet + // and join_type requires to store it + if self.visited_left_side.is_none() + && need_produce_result_in_final(self.join_type) + { + // TODO: Replace `ceil` wrapper with stable `div_cell` after + // https://github.com/rust-lang/rust/issues/88581 + let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8); + self.reservation.lock().try_grow(visited_bitmap_size)?; + self.join_metrics.build_mem_used.add(visited_bitmap_size); + } + let visited_left_side = self.visited_left_side.get_or_insert_with(|| { let num_rows = left_data.1.num_rows(); if need_produce_result_in_final(self.join_type) { @@ -1196,7 +1184,7 @@ impl HashJoinStream { Some(Ok(batch)) => { self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(batch.num_rows()); - let timer = self.join_metrics.probe_time.timer(); + let timer = self.join_metrics.join_time.timer(); // get the matched two indices for the on condition let left_right_indices = build_join_indices( @@ -1252,7 +1240,7 @@ impl HashJoinStream { result } None => { - let timer = self.join_metrics.probe_time.timer(); + let timer = self.join_metrics.join_time.timer(); if need_produce_result_in_final(self.join_type) && !self.is_exhausted { // use the global left bitmap to produce the left indices and right indices @@ -1309,10 +1297,13 @@ mod tests { use std::sync::Arc; use super::*; + use crate::execution::context::SessionConfig; use crate::physical_expr::expressions::BinaryExpr; use crate::prelude::SessionContext; use crate::{ assert_batches_sorted_eq, + common::assert_contains, + execution::runtime_env::{RuntimeConfig, RuntimeEnv}, physical_plan::{ common, expressions::Column, @@ -3040,4 +3031,134 @@ mod tests { ); } } + + #[tokio::test] + async fn single_partition_join_overallocation() -> Result<()> { + let left = build_table( + ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ); + let right = build_table( + ("a2", &vec![10, 11]), + ("b2", &vec![12, 13]), + ("c2", &vec![14, 15]), + ); + let on = vec![( + Column::new_with_schema("a1", &left.schema()).unwrap(), + Column::new_with_schema("b2", &right.schema()).unwrap(), + )]; + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::RightSemi, + JoinType::RightAnti, + ]; + + for join_type in join_types { + let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); + let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let session_ctx = + SessionContext::with_config_rt(SessionConfig::default(), runtime); + let task_ctx = session_ctx.task_ctx(); + + let join = join(left.clone(), right.clone(), on.clone(), &join_type, false)?; + + let stream = join.execute(0, task_ctx)?; + let err = common::collect(stream).await.unwrap_err(); + + assert_contains!( + err.to_string(), + "External error: Resources exhausted: Failed to allocate additional" + ); + + // Asserting that operator-level reservation attempting to overallocate + assert_contains!(err.to_string(), "HashJoinExec"); + } + + Ok(()) + } + + #[tokio::test] + async fn partitioned_join_overallocation() -> Result<()> { + // Prepare partitioned inputs for HashJoinExec + // No need to adjust partitioning, as execution should fail with `Resources exhausted` error + let left_batch = build_table_i32( + ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]), + ); + let left = Arc::new( + MemoryExec::try_new( + &[vec![left_batch.clone()], vec![left_batch.clone()]], + left_batch.schema(), + None, + ) + .unwrap(), + ); + let right_batch = build_table_i32( + ("a2", &vec![10, 11]), + ("b2", &vec![12, 13]), + ("c2", &vec![14, 15]), + ); + let right = Arc::new( + MemoryExec::try_new( + &[vec![right_batch.clone()], vec![right_batch.clone()]], + right_batch.schema(), + None, + ) + .unwrap(), + ); + let on = vec![( + Column::new_with_schema("b1", &left_batch.schema())?, + Column::new_with_schema("b2", &right_batch.schema())?, + )]; + + let join_types = vec![ + JoinType::Inner, + JoinType::Left, + JoinType::Right, + JoinType::Full, + JoinType::LeftSemi, + JoinType::LeftAnti, + JoinType::RightSemi, + JoinType::RightAnti, + ]; + + for join_type in join_types { + let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0); + let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); + let session_config = SessionConfig::default().with_batch_size(50); + let session_ctx = SessionContext::with_config_rt(session_config, runtime); + let task_ctx = session_ctx.task_ctx(); + + let join = HashJoinExec::try_new( + left.clone(), + right.clone(), + on.clone(), + None, + &join_type, + PartitionMode::Partitioned, + &false, + )?; + + let stream = join.execute(1, task_ctx)?; + let err = common::collect(stream).await.unwrap_err(); + + assert_contains!( + err.to_string(), + "External error: Resources exhausted: Failed to allocate additional" + ); + + // Asserting that stream-level reservation attempting to overallocate + assert_contains!(err.to_string(), "HashJoinStream[1]"); + } + + Ok(()) + } } diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index 75e26485d251..392e5494151a 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -74,6 +74,16 @@ async fn group_by_hash() { .await } +#[tokio::test] +async fn join_by_key() { + run_limit_test( + "select t1.* from t t1 JOIN t t2 ON t1.service = t2.service", + "Resources exhausted: Failed to allocate additional", + 1_000, + ) + .await +} + #[tokio::test] async fn cross_join() { run_limit_test(