Skip to content

Commit

Permalink
Always detach objects from manager
Browse files Browse the repository at this point in the history
This fixes #152
  • Loading branch information
bikeshedder committed Oct 17, 2021
1 parent 5a97016 commit a2f3c8e
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions src/managed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,10 @@ mod metrics;
pub mod reexports;
pub mod sync;

use std::{
collections::VecDeque,
fmt,
future::Future,
marker::PhantomData,
ops::{Deref, DerefMut},
sync::{
use std::{cell::RefCell, collections::VecDeque, fmt, future::Future, marker::PhantomData, ops::{Deref, DerefMut}, sync::{
atomic::{AtomicIsize, Ordering},
Arc, Mutex, Weak,
},
time::{Duration, Instant},
};
}, time::{Duration, Instant}};

use async_trait::async_trait;
use tokio::sync::{Semaphore, TryAcquireError};
Expand Down Expand Up @@ -133,6 +125,9 @@ pub struct Object<M: Manager> {

/// Pool to return the pooled object to.
pool: Weak<PoolInner<M>>,

/// Ready
ready: bool,
}

impl<M: Manager> Object<M> {
Expand Down Expand Up @@ -166,7 +161,11 @@ impl<M: Manager> Object<M> {
impl<M: Manager> Drop for Object<M> {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
pool.return_object(self.obj.take());
if self.ready {
pool.return_object(self.obj.take());
} else {
pool.manager.detach(&mut self.obj.take().unwrap().obj);
}
}
}
}
Expand Down Expand Up @@ -317,10 +316,15 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
.await?
};

let with_metrics = loop {
let obj = loop {
let queue_obj = self.inner.slots.lock().unwrap().vec.pop_front();
match queue_obj {
Some(mut with_metrics) => {
Some(with_metrics) => {
let mut obj = Object {
obj: Some(with_metrics),
pool: Arc::downgrade(&self.inner),
ready: false,
};
// Recycle existing object
let recycle_guard = DropGuard(|| {
let _ = self.inner.available.fetch_sub(1, Ordering::Relaxed);
Expand All @@ -332,7 +336,7 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
.inner
.hooks
.pre_recycle
.apply(&mut with_metrics, PoolError::PreRecycleHook)
.apply(&mut obj.obj.as_mut().unwrap(), PoolError::PreRecycleHook)
.await?
{
continue;
Expand All @@ -342,7 +346,7 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
self.inner.runtime,
TimeoutType::Recycle,
self.inner.config.timeouts.recycle,
self.inner.manager.recycle(&mut with_metrics.obj),
self.inner.manager.recycle(&mut *obj),
)
.await
.is_err()
Expand All @@ -355,18 +359,19 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
.inner
.hooks
.post_recycle
.apply(&mut with_metrics, PoolError::PostRecycleHook)
.apply(&mut obj.obj.as_mut().unwrap(), PoolError::PostRecycleHook)
.await?
{
continue;
}

with_metrics.metrics.recycle_count += 1;
with_metrics.metrics.recycled = Some(Instant::now());
obj.obj.as_mut().unwrap().metrics.recycle_count += 1;
obj.obj.as_mut().unwrap().metrics.recycled = Some(Instant::now());

recycle_guard.disarm();

break with_metrics;
obj.ready = true;
break obj;
}
None => {
// Create new object
Expand Down Expand Up @@ -395,19 +400,18 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
let _ = self.inner.available.fetch_add(1, Ordering::Relaxed);
self.inner.slots.lock().unwrap().size += 1;

break with_metrics;
break Object {
obj: Some(with_metrics),
pool: Arc::downgrade(&self.inner),
ready: true
};
}
}
};

available_guard.disarm();
permit.forget();

let obj = Object {
obj: Some(with_metrics),
pool: Arc::downgrade(&self.inner),
};

Ok(obj.into())
}

Expand All @@ -430,7 +434,8 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
while slots.size > slots.max_size {
if let Ok(permit) = self.inner.semaphore.try_acquire() {
permit.forget();
if slots.vec.pop_front().is_some() {
if let Some(mut obj) = slots.vec.pop_front() {
self.inner.manager.detach(&mut obj.obj);
slots.size -= 1;
}
} else {
Expand Down

0 comments on commit a2f3c8e

Please sign in to comment.