From cf9bcca7fb1f98c2d5150cef194d99fbc0ee2a09 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Jul 2024 18:47:02 -0700 Subject: [PATCH 01/17] feat(11523): TrackConsumersPool impl which includes errors messages with top K of consumers --- datafusion/execution/src/memory_pool/mod.rs | 2 +- datafusion/execution/src/memory_pool/pool.rs | 107 ++++++++++++++++++- 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 92ed1b2918de..8b166091b5ed 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug { /// For help with allocation accounting, see the [proxy] module. /// /// [proxy]: crate::memory_pool::proxy -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct MemoryConsumer { name: String, can_spill: bool, diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index fd7724f3076c..fc114f094816 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -17,9 +17,10 @@ use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; +use hashbrown::HashMap; use log::debug; use parking_lot::Mutex; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; /// A [`MemoryPool`] that enforces no limit #[derive(Debug, Default)] @@ -240,6 +241,110 @@ fn insufficient_capacity_err( resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) } +/// A [`MemoryPool`] that tracks the consumers that have +/// reserved memory within the inner memory pool. +#[derive(Debug)] +pub struct TrackConsumersPool { + inner: I, + top: usize, + tracked_consumers: Mutex>, +} + +impl TrackConsumersPool { + /// Creates a new [`TrackConsumersPool`]. + /// + /// The `top` determines how many Top K [`MemoryConsumer`]s to include + /// in the reported [`DataFusionError::ResourcesExhausted`]. + pub fn new(inner: I, top: usize) -> Self { + Self { + inner, + top, + tracked_consumers: Default::default(), + } + } + + /// The top consumers in a report string. + fn report_top(&self) -> String { + let mut consumers = self + .tracked_consumers + .lock() + .iter() + .map(|(consumer, reserved)| { + (consumer.name().to_owned(), reserved.load(Ordering::Acquire)) + }) + .collect::>(); + consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering + + format!( + "The top memory consumers (across reservations) are: {}", + consumers[0..std::cmp::min(self.top, consumers.len())] + .iter() + .map(|(name, size)| format!("{name} consumed {:?} bytes", size)) + .collect::>() + .join(", ") + ) + } +} + +impl MemoryPool for TrackConsumersPool { + fn register(&self, consumer: &MemoryConsumer) { + self.inner.register(consumer); + self.tracked_consumers + .lock() + .insert_unique_unchecked(consumer.clone(), Default::default()); + } + + fn unregister(&self, consumer: &MemoryConsumer) { + self.inner.unregister(consumer); + self.tracked_consumers.lock().remove(consumer); + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + self.inner.grow(reservation, additional); + self.tracked_consumers + .lock() + .entry_ref(reservation.consumer()) + .and_modify(|bytes| { + bytes.fetch_add(additional as u64, Ordering::AcqRel); + }); + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + self.inner.shrink(reservation, shrink); + self.tracked_consumers + .lock() + .entry_ref(reservation.consumer()) + .and_modify(|bytes| { + bytes.fetch_sub(shrink as u64, Ordering::AcqRel); + }); + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { + self.inner.try_grow(reservation, additional).map_err(|e| { + match e.find_root() { + DataFusionError::ResourcesExhausted(e) => { + DataFusionError::ResourcesExhausted( + e.to_owned() + ". " + &self.report_top(), + ) + } + _ => e, + } + })?; + + self.tracked_consumers + .lock() + .entry_ref(reservation.consumer()) + .and_modify(|bytes| { + bytes.fetch_add(additional as u64, Ordering::AcqRel); + }); + Ok(()) + } + + fn reserved(&self) -> usize { + self.inner.reserved() + } +} + #[cfg(test)] mod tests { use super::*; From d4e4da2469a9d6f589c8fe3b698c138f0d0f99b3 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Jul 2024 18:47:50 -0700 Subject: [PATCH 02/17] test(11523): unit tests for TrackConsumersPool --- datafusion/execution/src/memory_pool/pool.rs | 50 ++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index fc114f094816..df52835576c1 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -416,4 +416,54 @@ mod tests { let err = r4.try_grow(30).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); } + + #[test] + fn test_tracked_consumers_pool() { + let pool: Arc = + Arc::new(TrackConsumersPool::new(GreedyMemoryPool::new(100), 3)); + + // Test: see error message when no consumers recorded yet + let mut r0 = MemoryConsumer::new("r0").register(&pool); + let expected = "Failed to allocate additional 150 bytes for r0 with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: r0 consumed 0 bytes"; + assert!( + matches!( + r0.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should error when no other consumers are reported" + ); + + // Test: use all the different interfaces to change reservation size + + // set r1=50, using grow and shrink + let mut r1 = MemoryConsumer::new("r1").register(&pool); + r1.grow(70); + r1.shrink(20); + + // set r2=15 using try_grow + let mut r2 = MemoryConsumer::new("r2").register(&pool); + r2.try_grow(15) + .expect("should succeed in memory allotment for r2"); + + // set r3=20 using try_resize + let mut r3 = MemoryConsumer::new("r3").register(&pool); + r3.try_resize(25) + .expect("should succeed in memory allotment for r3"); + r3.try_resize(20) + .expect("should succeed in memory allotment for r3"); + + // set r4=10 + // this should not be reported in top 3 + let mut r4 = MemoryConsumer::new("r4").register(&pool); + r4.grow(10); + + // Test: reports if new reservation causes error + // using the previous set size for other consumers + let mut r5 = MemoryConsumer::new("r5").register(&pool); + let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + assert!(matches!( + r5.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + )); + } } From 92541c086d9cfc2ab152c60b6cf7f7bdf2059c21 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Jul 2024 19:01:10 -0700 Subject: [PATCH 03/17] test(11523): integration test for tracked consumers oom message --- datafusion/core/tests/memory_limit/mod.rs | 54 ++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index bc2c3315da59..c92b59904aae 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -26,6 +26,9 @@ use datafusion::assert_batches_eq; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::streaming::PartitionStream; +use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryPool, TrackConsumersPool, +}; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use futures::StreamExt; @@ -370,6 +373,39 @@ async fn oom_parquet_sink() { .await } +#[tokio::test] +async fn oom_with_tracked_consumer_pool() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.into_path().join("test.parquet"); + let _ = File::create(path.clone()).await.unwrap(); + + TestCase::new() + .with_config( + SessionConfig::new() + ) + .with_query(format!( + " + COPY (select * from t) + TO '{}' + STORED AS PARQUET OPTIONS (compression 'uncompressed'); + ", + path.to_string_lossy() + )) + .with_expected_errors(vec![ + "Failed to allocate additional", + "for ParquetSink(ArrowColumnWriter)", + "The top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)" + ]) + .with_memory_pool(Arc::new( + TrackConsumersPool::new( + GreedyMemoryPool::new(200_000), + 2 + ) + )) + .run() + .await +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] @@ -377,6 +413,7 @@ struct TestCase { query: Option, expected_errors: Vec, memory_limit: usize, + memory_pool: Option>, config: SessionConfig, scenario: Scenario, /// How should the disk manager (that allows spilling) be @@ -395,6 +432,7 @@ impl TestCase { expected_errors: vec![], memory_limit: 0, config: SessionConfig::new(), + memory_pool: None, scenario: Scenario::AccessLog, disk_manager_config: DiskManagerConfig::Disabled, expected_plan: vec![], @@ -424,6 +462,15 @@ impl TestCase { self } + /// Set the memory pool to be used + /// + /// This will override the memory_limit requested, + /// as the memory pool includes the limit. + fn with_memory_pool(mut self, memory_pool: Arc) -> Self { + self.memory_pool = Some(memory_pool); + self + } + /// Specify the configuration to use pub fn with_config(mut self, config: SessionConfig) -> Self { self.config = config; @@ -464,6 +511,7 @@ impl TestCase { query, expected_errors, memory_limit, + memory_pool, config, scenario, disk_manager_config, @@ -473,11 +521,15 @@ impl TestCase { let table = scenario.table(); - let rt_config = RuntimeConfig::new() + let mut rt_config = RuntimeConfig::new() // disk manager setting controls the spilling .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); + if let Some(pool) = memory_pool { + rt_config = rt_config.with_memory_pool(pool); + }; + let runtime = RuntimeEnv::new(rt_config).unwrap(); // Configure execution From 8941fa30867826d02f6de13da177e8d801df45d7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Jul 2024 19:22:02 -0700 Subject: [PATCH 04/17] chore(11523): use nonzero usize --- datafusion/core/tests/memory_limit/mod.rs | 3 ++- datafusion/execution/src/memory_pool/pool.rs | 17 +++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index c92b59904aae..210205885ff9 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -33,6 +33,7 @@ use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use futures::StreamExt; use std::any::Any; +use std::num::NonZeroUsize; use std::sync::{Arc, OnceLock}; use tokio::fs::File; @@ -399,7 +400,7 @@ async fn oom_with_tracked_consumer_pool() { .with_memory_pool(Arc::new( TrackConsumersPool::new( GreedyMemoryPool::new(200_000), - 2 + NonZeroUsize::new(1).unwrap() ) )) .run() diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index df52835576c1..362a8a107927 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -20,7 +20,10 @@ use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; use hashbrown::HashMap; use log::debug; use parking_lot::Mutex; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::{ + num::NonZeroUsize, + sync::atomic::{AtomicU64, AtomicUsize, Ordering}, +}; /// A [`MemoryPool`] that enforces no limit #[derive(Debug, Default)] @@ -246,7 +249,7 @@ fn insufficient_capacity_err( #[derive(Debug)] pub struct TrackConsumersPool { inner: I, - top: usize, + top: NonZeroUsize, tracked_consumers: Mutex>, } @@ -255,7 +258,7 @@ impl TrackConsumersPool { /// /// The `top` determines how many Top K [`MemoryConsumer`]s to include /// in the reported [`DataFusionError::ResourcesExhausted`]. - pub fn new(inner: I, top: usize) -> Self { + pub fn new(inner: I, top: NonZeroUsize) -> Self { Self { inner, top, @@ -277,7 +280,7 @@ impl TrackConsumersPool { format!( "The top memory consumers (across reservations) are: {}", - consumers[0..std::cmp::min(self.top, consumers.len())] + consumers[0..std::cmp::min(self.top.into(), consumers.len())] .iter() .map(|(name, size)| format!("{name} consumed {:?} bytes", size)) .collect::>() @@ -419,8 +422,10 @@ mod tests { #[test] fn test_tracked_consumers_pool() { - let pool: Arc = - Arc::new(TrackConsumersPool::new(GreedyMemoryPool::new(100), 3)); + let pool: Arc = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new("r0").register(&pool); From 7ff15348fbab3c978bb86e3b477ac4423b6e00c6 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 25 Jul 2024 19:27:36 -0700 Subject: [PATCH 05/17] chore(11523): document the what the memory insufficient_capacity_err is actually returning --- datafusion/execution/src/memory_pool/pool.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 362a8a107927..ba3ba6cf7f7b 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -235,6 +235,10 @@ impl MemoryPool for FairSpillPool { } } +/// Constructs a resources error based upon the individual [`MemoryReservation`]. +/// +/// The error references the total bytes affiliated with the reservation's +/// [`MemoryConsumer`], and not the total within the collective [`MemoryPool`]. #[inline(always)] fn insufficient_capacity_err( reservation: &MemoryReservation, From f3905de8b0659598ee34d41bd21ba599af7a75b4 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 26 Jul 2024 08:56:16 -0700 Subject: [PATCH 06/17] chore(11523): improve test failure coverage for TrackConsumersPool --- datafusion/execution/src/memory_pool/pool.rs | 86 ++++++++++++++++---- 1 file changed, 71 insertions(+), 15 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index ba3ba6cf7f7b..51347daf8bb1 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -431,17 +431,6 @@ mod tests { NonZeroUsize::new(3).unwrap(), )); - // Test: see error message when no consumers recorded yet - let mut r0 = MemoryConsumer::new("r0").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r0 with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: r0 consumed 0 bytes"; - assert!( - matches!( - r0.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) - ), - "should error when no other consumers are reported" - ); - // Test: use all the different interfaces to change reservation size // set r1=50, using grow and shrink @@ -467,12 +456,79 @@ mod tests { r4.grow(10); // Test: reports if new reservation causes error - // using the previous set size for other consumers + // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; - assert!(matches!( - r5.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + assert!( + matches!( + r5.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should provide list of top memory consumers" + ); + } + + #[test] + fn test_tracked_consumers_pool_register() { + let pool: Arc = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), )); + + let same_name = "foo"; + + // Test: see error message when no consumers recorded yet + let mut r0 = MemoryConsumer::new(same_name).register(&pool); + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + assert!( + matches!( + r0.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should provide proper error when no reservations have been made yet" + ); + + // API: multiple registrations using the same hashed consumer, + // will be recognized as the same in the TrackConsumersPool. + + // Test: will be the same per Top Consumers reported. + r0.grow(10); // make r0=10, pool available=90 + let new_consumer_same_name = MemoryConsumer::new(same_name); + let mut r1 = new_consumer_same_name.clone().register(&pool); + // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. + // a followup PR will clarify this message "0 bytes already allocated for this reservation" + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + assert!( + matches!( + r1.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90)" + ); + + // Test: will accumulate size changes per consumer, not per reservation + r1.grow(20); + let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + assert!( + matches!( + r1.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should provide proper error with same hashed consumer (a single foo=30 bytes, available=70)" + ); + + // Test: different hashed consumer, (even with the same name), + // will be recognized as different in the TrackConsumersPool + let consumer_with_same_name_but_different_hash = + MemoryConsumer::new(same_name).with_can_spill(true); + let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes, foo consumed 0 bytes"; + assert!( + matches!( + r2.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70)" + ); } } From ddc77005db6dc06577148e5be47236d022878305 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 26 Jul 2024 10:43:09 -0700 Subject: [PATCH 07/17] fix(11523): handle additive tracking of same hashed consumer, across different reservations --- datafusion/execution/src/memory_pool/pool.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 51347daf8bb1..a87f545d4a6f 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -250,6 +250,9 @@ fn insufficient_capacity_err( /// A [`MemoryPool`] that tracks the consumers that have /// reserved memory within the inner memory pool. +/// +/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`]. +/// The same consumer can have multiple reservations. #[derive(Debug)] pub struct TrackConsumersPool { inner: I, @@ -296,9 +299,17 @@ impl TrackConsumersPool { impl MemoryPool for TrackConsumersPool { fn register(&self, consumer: &MemoryConsumer) { self.inner.register(consumer); - self.tracked_consumers - .lock() - .insert_unique_unchecked(consumer.clone(), Default::default()); + + let mut guard = self.tracked_consumers.lock(); + if let Some(already_reserved) = guard.insert(consumer.clone(), Default::default()) + { + guard.entry_ref(consumer).and_modify(|bytes| { + bytes.fetch_add( + already_reserved.load(Ordering::Acquire), + Ordering::AcqRel, + ); + }); + } } fn unregister(&self, consumer: &MemoryConsumer) { From e71a7102d73a4b666812abaacea71cf9f9047caf Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 26 Jul 2024 11:01:00 -0700 Subject: [PATCH 08/17] refactor(11523): update error message to delineate the multiple consumer with the same name, but different hash --- datafusion/execution/src/memory_pool/pool.rs | 29 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index a87f545d4a6f..3937fa4718ee 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -273,6 +273,17 @@ impl TrackConsumersPool { } } + /// Determine if there are multiple [`MemoryConsumer`]s registered + /// which have the same name. + /// + /// This is very tied to the implementation of the memory consumer. + fn has_multiple_consumers(&self, name: &String) -> bool { + let consumer = MemoryConsumer::new(name); + let consumer_with_spill = consumer.clone().with_can_spill(true); + let guard = self.tracked_consumers.lock(); + guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) + } + /// The top consumers in a report string. fn report_top(&self) -> String { let mut consumers = self @@ -280,7 +291,10 @@ impl TrackConsumersPool { .lock() .iter() .map(|(consumer, reserved)| { - (consumer.name().to_owned(), reserved.load(Ordering::Acquire)) + ( + (consumer.name().to_owned(), consumer.can_spill()), + reserved.load(Ordering::Acquire), + ) }) .collect::>(); consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering @@ -289,7 +303,16 @@ impl TrackConsumersPool { "The top memory consumers (across reservations) are: {}", consumers[0..std::cmp::min(self.top.into(), consumers.len())] .iter() - .map(|(name, size)| format!("{name} consumed {:?} bytes", size)) + .map(|((name, can_spill), size)| { + if self.has_multiple_consumers(name) { + format!( + "{name}(can_spill={}) consumed {:?} bytes", + can_spill, size + ) + } else { + format!("{name} consumed {:?} bytes", size) + } + }) .collect::>() .join(", ") ) @@ -533,7 +556,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes, foo consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; assert!( matches!( r2.try_grow(150), From 944736860f9e7b053ff52d7530abf1655cc5bbb4 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 26 Jul 2024 11:27:03 -0700 Subject: [PATCH 09/17] test(11523): demonstrate the underlying pool behavior on deregister --- datafusion/execution/src/memory_pool/pool.rs | 67 ++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 3937fa4718ee..213648e4f5dc 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -565,4 +565,71 @@ mod tests { "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70)" ); } + + #[test] + fn test_tracked_consumers_pool_deregister() { + fn test_per_pool_type(pool: Arc) { + // Baseline: see the 2 memory consumers + let mut r0 = MemoryConsumer::new("r0").register(&pool); + r0.grow(10); + let r1_consumer = MemoryConsumer::new("r1"); + let mut r1 = r1_consumer.clone().register(&pool); + r1.grow(20); + let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + assert!( + matches!( + r0.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + ), + "should provide proper error with both consumers" + ); + + // Test: unregister one + // only the remaining one should be listed + pool.unregister(&r1_consumer); + let expected_consumers = "The top memory consumers (across reservations) are: r0 consumed 10 bytes"; + assert!( + matches!( + r0.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_consumers) + ), + "should provide proper error with only 1 consumer left registered" + ); + + // Test: actual message we see is the `available is 70`. When it should be `available is 90`. + // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; + assert!( + matches!( + r0.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_70_available) + ), + "inner pool will still count all bytes for the deregistered consumer, until the reservation is dropped" + ); + + // Test: the registration needs to free itself (or be dropped), + // for the proper error message + r1.free(); + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; + assert!( + matches!( + r0.try_grow(150), + Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_90_available) + ), + "after reservation is free, the inner pool should correctly account the total bytes" + ); + } + + let tracked_spill_pool: Arc = Arc::new(TrackConsumersPool::new( + FairSpillPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + test_per_pool_type(tracked_spill_pool); + + let tracked_greedy_pool: Arc = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + test_per_pool_type(tracked_greedy_pool); + } } From a8383fa0a542c61441dc2cd2c1593b4d10865f41 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 26 Jul 2024 11:39:12 -0700 Subject: [PATCH 10/17] chore: make explicit what the insufficient_capacity_err() logs --- datafusion/execution/src/memory_pool/pool.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 213648e4f5dc..23ab314486c3 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -237,8 +237,9 @@ impl MemoryPool for FairSpillPool { /// Constructs a resources error based upon the individual [`MemoryReservation`]. /// -/// The error references the total bytes affiliated with the reservation's -/// [`MemoryConsumer`], and not the total within the collective [`MemoryPool`]. +/// The error references the `bytes already allocated` for the reservation, +/// and not the total within the collective [`MemoryPool`], +/// nor the total across multiple reservations with the same [`MemoryConsumer`]. #[inline(always)] fn insufficient_capacity_err( reservation: &MemoryReservation, From 1b1223f957db2c8dba18d85d2f78f8b1b985aa2c Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 26 Jul 2024 11:56:54 -0700 Subject: [PATCH 11/17] fix(11523): remove to_root() for the error, since the immediate inner child should be returning an OOM --- datafusion/execution/src/memory_pool/pool.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 23ab314486c3..9ce14e41adc1 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -362,16 +362,16 @@ impl MemoryPool for TrackConsumersPool { } fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { - self.inner.try_grow(reservation, additional).map_err(|e| { - match e.find_root() { + self.inner + .try_grow(reservation, additional) + .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { DataFusionError::ResourcesExhausted( e.to_owned() + ". " + &self.report_top(), ) } _ => e, - } - })?; + })?; self.tracked_consumers .lock() From 9a77f907909283edce385e159812e52a7c7cfd98 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 29 Jul 2024 09:41:22 -0700 Subject: [PATCH 12/17] chore(11523): add result to logging of failed CI tests --- datafusion/execution/src/memory_pool/pool.rs | 65 ++++++++++++-------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 9ce14e41adc1..375d5cbe2d84 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -494,12 +494,14 @@ mod tests { // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let res = r5.try_grow(150); assert!( matches!( - r5.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) ), - "should provide list of top memory consumers" + "should provide list of top memory consumers, instead found {:?}", + res ); } @@ -515,12 +517,13 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let res = r0.try_grow(150); assert!( matches!( - r0.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) ), - "should provide proper error when no reservations have been made yet" + "should provide proper error when no reservations have been made yet, instead found {:?}", res ); // API: multiple registrations using the same hashed consumer, @@ -533,23 +536,25 @@ mod tests { // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let res = r1.try_grow(150); assert!( matches!( - r1.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) ), - "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90)" + "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90), instead found {:?}", res ); // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let res = r1.try_grow(150); assert!( matches!( - r1.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) ), - "should provide proper error with same hashed consumer (a single foo=30 bytes, available=70)" + "should provide proper error with same hashed consumer (a single foo=30 bytes, available=70), instead found {:?}", res ); // Test: different hashed consumer, (even with the same name), @@ -558,12 +563,13 @@ mod tests { MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let res = r2.try_grow(150); assert!( matches!( - r2.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) ), - "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70)" + "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70), instead found {:?}", res ); } @@ -577,47 +583,52 @@ mod tests { let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let res = r0.try_grow(150); assert!( matches!( - r0.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) ), - "should provide proper error with both consumers" + "should provide proper error with both consumers, instead found {:?}", + res ); // Test: unregister one // only the remaining one should be listed pool.unregister(&r1_consumer); let expected_consumers = "The top memory consumers (across reservations) are: r0 consumed 10 bytes"; + let res = r0.try_grow(150); assert!( matches!( - r0.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_consumers) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_consumers) ), - "should provide proper error with only 1 consumer left registered" + "should provide proper error with only 1 consumer left registered, instead found {:?}", res ); // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; + let res = r0.try_grow(150); assert!( matches!( - r0.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_70_available) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_70_available) ), - "inner pool will still count all bytes for the deregistered consumer, until the reservation is dropped" + "should find that the inner pool will still count all bytes for the deregistered consumer until the reservation is dropped, instead found {:?}", res ); // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; + let res = r0.try_grow(150); assert!( matches!( - r0.try_grow(150), - Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_90_available) + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available) ), - "after reservation is free, the inner pool should correctly account the total bytes" + "should correctly account the total bytes after reservation is free, instead found {:?}", res ); } From 09b20d289f53d3b61b976313f8731e8a6711f370 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 29 Jul 2024 12:49:09 -0700 Subject: [PATCH 13/17] fix(11523): splice error message to get consumers prior to error message --- datafusion/execution/src/memory_pool/pool.rs | 46 +++++++++++++------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 375d5cbe2d84..7263f4707b4a 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -246,7 +246,7 @@ fn insufficient_capacity_err( additional: usize, available: usize, ) -> DataFusionError { - resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) + resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {} for the total pool", additional, reservation.registration.consumer.name, reservation.size, available) } /// A [`MemoryPool`] that tracks the consumers that have @@ -367,7 +367,10 @@ impl MemoryPool for TrackConsumersPool { .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { DataFusionError::ResourcesExhausted( - e.to_owned() + ". " + &self.report_top(), + parse_error_message_and_insert_top_consumers( + e.to_owned(), + self.report_top(), + ), ) } _ => e, @@ -387,6 +390,19 @@ impl MemoryPool for TrackConsumersPool { } } +/// This is very tied to the implementation of [`insufficient_capacity_err`]. +fn parse_error_message_and_insert_top_consumers( + mut error_msg: String, + top_consumers: String, +) -> String { + let end_of_oom_error = error_msg + .find("for the total pool") + .expect("should have OOM error") + + "for the total pool".len(); + error_msg.insert_str(end_of_oom_error, &format!(". {}", top_consumers)); + error_msg +} + #[cfg(test)] mod tests { use super::*; @@ -410,10 +426,10 @@ mod tests { assert_eq!(pool.reserved(), 4000); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); r1.shrink(1990); r2.shrink(2000); @@ -438,12 +454,12 @@ mod tests { .register(&pool); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); // But dropping r2 does drop(r2); @@ -456,7 +472,7 @@ mod tests { let mut r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20 for the total pool"); } #[test] @@ -493,7 +509,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5 for the total pool. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; let res = r5.try_grow(150); assert!( matches!( @@ -516,7 +532,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100 for the total pool. The top memory consumers (across reservations) are: foo consumed 0 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -535,7 +551,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90 for the total pool. The top memory consumers (across reservations) are: foo consumed 10 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -547,7 +563,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo consumed 30 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -562,7 +578,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; let res = r2.try_grow(150); assert!( matches!( @@ -582,7 +598,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -608,7 +624,7 @@ mod tests { // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool"; let res = r0.try_grow(150); assert!( matches!( @@ -621,7 +637,7 @@ mod tests { // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); - let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90 for the total pool"; let res = r0.try_grow(150); assert!( matches!( From f4057957ee8d0767cc374045f92d86d04ecd8569 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 29 Jul 2024 13:24:01 -0700 Subject: [PATCH 14/17] Revert "fix(11523): splice error message to get consumers prior to error message" This reverts commit 09b20d289f53d3b61b976313f8731e8a6711f370. --- datafusion/execution/src/memory_pool/pool.rs | 46 +++++++------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 7263f4707b4a..375d5cbe2d84 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -246,7 +246,7 @@ fn insufficient_capacity_err( additional: usize, available: usize, ) -> DataFusionError { - resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {} for the total pool", additional, reservation.registration.consumer.name, reservation.size, available) + resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) } /// A [`MemoryPool`] that tracks the consumers that have @@ -367,10 +367,7 @@ impl MemoryPool for TrackConsumersPool { .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { DataFusionError::ResourcesExhausted( - parse_error_message_and_insert_top_consumers( - e.to_owned(), - self.report_top(), - ), + e.to_owned() + ". " + &self.report_top(), ) } _ => e, @@ -390,19 +387,6 @@ impl MemoryPool for TrackConsumersPool { } } -/// This is very tied to the implementation of [`insufficient_capacity_err`]. -fn parse_error_message_and_insert_top_consumers( - mut error_msg: String, - top_consumers: String, -) -> String { - let end_of_oom_error = error_msg - .find("for the total pool") - .expect("should have OOM error") - + "for the total pool".len(); - error_msg.insert_str(end_of_oom_error, &format!(". {}", top_consumers)); - error_msg -} - #[cfg(test)] mod tests { use super::*; @@ -426,10 +410,10 @@ mod tests { assert_eq!(pool.reserved(), 4000); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); r1.shrink(1990); r2.shrink(2000); @@ -454,12 +438,12 @@ mod tests { .register(&pool); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); // But dropping r2 does drop(r2); @@ -472,7 +456,7 @@ mod tests { let mut r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); } #[test] @@ -509,7 +493,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5 for the total pool. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; let res = r5.try_grow(150); assert!( matches!( @@ -532,7 +516,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100 for the total pool. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -551,7 +535,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90 for the total pool. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -563,7 +547,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -578,7 +562,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; let res = r2.try_grow(150); assert!( matches!( @@ -598,7 +582,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -624,7 +608,7 @@ mod tests { // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool"; + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; let res = r0.try_grow(150); assert!( matches!( @@ -637,7 +621,7 @@ mod tests { // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); - let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90 for the total pool"; + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; let res = r0.try_grow(150); assert!( matches!( From f75764e9de36c4d336a6f84203c95a460a9d3610 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 29 Jul 2024 13:37:36 -0700 Subject: [PATCH 15/17] fix(11523): fix without splicing error messages, and instead handle the proper error bubbling (msg wrapping) --- datafusion/core/tests/memory_limit/mod.rs | 2 +- datafusion/execution/src/memory_pool/pool.rs | 59 +++++++++++--------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 210205885ff9..4595e571938c 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -395,7 +395,7 @@ async fn oom_with_tracked_consumer_pool() { .with_expected_errors(vec![ "Failed to allocate additional", "for ParquetSink(ArrowColumnWriter)", - "The top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)" + "Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 375d5cbe2d84..f8c4772843b4 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -300,23 +300,17 @@ impl TrackConsumersPool { .collect::>(); consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering - format!( - "The top memory consumers (across reservations) are: {}", - consumers[0..std::cmp::min(self.top.into(), consumers.len())] - .iter() - .map(|((name, can_spill), size)| { - if self.has_multiple_consumers(name) { - format!( - "{name}(can_spill={}) consumed {:?} bytes", - can_spill, size - ) - } else { - format!("{name} consumed {:?} bytes", size) - } - }) - .collect::>() - .join(", ") - ) + consumers[0..std::cmp::min(self.top.into(), consumers.len())] + .iter() + .map(|((name, can_spill), size)| { + if self.has_multiple_consumers(name) { + format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size) + } else { + format!("{name} consumed {:?} bytes", size) + } + }) + .collect::>() + .join(", ") } } @@ -366,8 +360,12 @@ impl MemoryPool for TrackConsumersPool { .try_grow(reservation, additional) .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { + // wrap OOM message in top consumers DataFusionError::ResourcesExhausted( - e.to_owned() + ". " + &self.report_top(), + provide_top_memory_consumers_to_error_msg( + e.to_owned(), + self.report_top(), + ), ) } _ => e, @@ -387,6 +385,13 @@ impl MemoryPool for TrackConsumersPool { } } +fn provide_top_memory_consumers_to_error_msg( + error_msg: String, + top_consumers: String, +) -> String { + format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg) +} + #[cfg(test)] mod tests { use super::*; @@ -493,7 +498,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5"; let res = r5.try_grow(150); assert!( matches!( @@ -516,7 +521,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100"; let res = r0.try_grow(150); assert!( matches!( @@ -535,7 +540,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90"; let res = r1.try_grow(150); assert!( matches!( @@ -547,7 +552,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70"; let res = r1.try_grow(150); assert!( matches!( @@ -562,7 +567,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70"; let res = r2.try_grow(150); assert!( matches!( @@ -582,7 +587,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70"; let res = r0.try_grow(150); assert!( matches!( @@ -596,7 +601,7 @@ mod tests { // Test: unregister one // only the remaining one should be listed pool.unregister(&r1_consumer); - let expected_consumers = "The top memory consumers (across reservations) are: r0 consumed 10 bytes"; + let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -608,7 +613,7 @@ mod tests { // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70"; let res = r0.try_grow(150); assert!( matches!( @@ -621,7 +626,7 @@ mod tests { // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); - let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90"; let res = r0.try_grow(150); assert!( matches!( From c3ce60f211c64e083897703adc076807c20e3617 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 30 Jul 2024 12:15:43 -0700 Subject: [PATCH 16/17] chore: update docs to explain purpose of TrackConsumersPool Co-authored-by: Andrew Lamb --- datafusion/execution/src/memory_pool/pool.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index f8c4772843b4..83941b439aa7 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -252,6 +252,9 @@ fn insufficient_capacity_err( /// A [`MemoryPool`] that tracks the consumers that have /// reserved memory within the inner memory pool. /// +/// By tracking memory reservations more carefully this pool +/// can provide better error messages on the largest memory users +/// /// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`]. /// The same consumer can have multiple reservations. #[derive(Debug)] From c8c01961dd54df1703d7efdf216a8f63db8656b4 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 30 Jul 2024 14:13:57 -0700 Subject: [PATCH 17/17] refactor(11523): enable TrackConsumersPool to be used in runtime metrics --- datafusion/execution/src/memory_pool/pool.rs | 40 ++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 83941b439aa7..9cb6f207e59c 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -289,7 +289,7 @@ impl TrackConsumersPool { } /// The top consumers in a report string. - fn report_top(&self) -> String { + pub fn report_top(&self, top: usize) -> String { let mut consumers = self .tracked_consumers .lock() @@ -303,7 +303,7 @@ impl TrackConsumersPool { .collect::>(); consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering - consumers[0..std::cmp::min(self.top.into(), consumers.len())] + consumers[0..std::cmp::min(top, consumers.len())] .iter() .map(|((name, can_spill), size)| { if self.has_multiple_consumers(name) { @@ -367,7 +367,7 @@ impl MemoryPool for TrackConsumersPool { DataFusionError::ResourcesExhausted( provide_top_memory_consumers_to_error_msg( e.to_owned(), - self.report_top(), + self.report_top(self.top.into()), ), ) } @@ -652,4 +652,38 @@ mod tests { )); test_per_pool_type(tracked_greedy_pool); } + + #[test] + fn test_tracked_consumers_pool_use_beyond_errors() { + let upcasted: Arc = + Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + let pool: Arc = Arc::clone(&upcasted) + .downcast::>() + .unwrap(); + // set r1=20 + let mut r1 = MemoryConsumer::new("r1").register(&pool); + r1.grow(20); + // set r2=15 + let mut r2 = MemoryConsumer::new("r2").register(&pool); + r2.grow(15); + // set r3=45 + let mut r3 = MemoryConsumer::new("r3").register(&pool); + r3.grow(45); + + let downcasted = upcasted + .downcast::>() + .unwrap(); + + // Test: can get runtime metrics, even without an error thrown + let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes"; + let res = downcasted.report_top(2); + assert_eq!( + res, expected, + "should provide list of top memory consumers, instead found {:?}", + res + ); + } }