diff --git a/src/managed/mod.rs b/src/managed/mod.rs index 2c27daeb..cda1faae 100644 --- a/src/managed/mod.rs +++ b/src/managed/mod.rs @@ -59,18 +59,10 @@ mod metrics; pub mod reexports; pub mod sync; -use std::{ - collections::VecDeque, - fmt, - future::Future, - marker::PhantomData, - ops::{Deref, DerefMut}, - sync::{ +use std::{cell::RefCell, collections::VecDeque, fmt, future::Future, marker::PhantomData, ops::{Deref, DerefMut}, sync::{ atomic::{AtomicIsize, Ordering}, Arc, Mutex, Weak, - }, - time::{Duration, Instant}, -}; + }, time::{Duration, Instant}}; use async_trait::async_trait; use tokio::sync::{Semaphore, TryAcquireError}; @@ -133,6 +125,9 @@ pub struct Object { /// Pool to return the pooled object to. pool: Weak>, + + /// Ready + ready: bool, } impl Object { @@ -166,7 +161,11 @@ impl Object { impl Drop for Object { fn drop(&mut self) { if let Some(pool) = self.pool.upgrade() { - pool.return_object(self.obj.take()); + if self.ready { + pool.return_object(self.obj.take()); + } else { + pool.manager.detach(&mut self.obj.take().unwrap().obj); + } } } } @@ -317,10 +316,15 @@ impl>> Pool { .await? }; - let with_metrics = loop { + let obj = loop { let queue_obj = self.inner.slots.lock().unwrap().vec.pop_front(); match queue_obj { - Some(mut with_metrics) => { + Some(with_metrics) => { + let mut obj = Object { + obj: Some(with_metrics), + pool: Arc::downgrade(&self.inner), + ready: false, + }; // Recycle existing object let recycle_guard = DropGuard(|| { let _ = self.inner.available.fetch_sub(1, Ordering::Relaxed); @@ -332,7 +336,7 @@ impl>> Pool { .inner .hooks .pre_recycle - .apply(&mut with_metrics, PoolError::PreRecycleHook) + .apply(&mut obj.obj.as_mut().unwrap(), PoolError::PreRecycleHook) .await? { continue; @@ -342,7 +346,7 @@ impl>> Pool { self.inner.runtime, TimeoutType::Recycle, self.inner.config.timeouts.recycle, - self.inner.manager.recycle(&mut with_metrics.obj), + self.inner.manager.recycle(&mut *obj), ) .await .is_err() @@ -355,18 +359,19 @@ impl>> Pool { .inner .hooks .post_recycle - .apply(&mut with_metrics, PoolError::PostRecycleHook) + .apply(&mut obj.obj.as_mut().unwrap(), PoolError::PostRecycleHook) .await? { continue; } - with_metrics.metrics.recycle_count += 1; - with_metrics.metrics.recycled = Some(Instant::now()); + obj.obj.as_mut().unwrap().metrics.recycle_count += 1; + obj.obj.as_mut().unwrap().metrics.recycled = Some(Instant::now()); recycle_guard.disarm(); - break with_metrics; + obj.ready = true; + break obj; } None => { // Create new object @@ -395,7 +400,11 @@ impl>> Pool { let _ = self.inner.available.fetch_add(1, Ordering::Relaxed); self.inner.slots.lock().unwrap().size += 1; - break with_metrics; + break Object { + obj: Some(with_metrics), + pool: Arc::downgrade(&self.inner), + ready: true + }; } } }; @@ -403,11 +412,6 @@ impl>> Pool { available_guard.disarm(); permit.forget(); - let obj = Object { - obj: Some(with_metrics), - pool: Arc::downgrade(&self.inner), - }; - Ok(obj.into()) } @@ -430,7 +434,8 @@ impl>> Pool { while slots.size > slots.max_size { if let Ok(permit) = self.inner.semaphore.try_acquire() { permit.forget(); - if slots.vec.pop_front().is_some() { + if let Some(mut obj) = slots.vec.pop_front() { + self.inner.manager.detach(&mut obj.obj); slots.size -= 1; } } else {