Skip to content

Commit

Permalink
Fix djc#154 on top of v0.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nmldiegues committed Apr 3, 2023
1 parent 8781965 commit 5c1fe4a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 24 deletions.
53 changes: 34 additions & 19 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::{max, min};
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -40,7 +41,13 @@ where
}

pub(crate) async fn start_connections(&self) -> Result<(), M::Error> {
let wanted = self.inner.internals.lock().wanted(&self.inner.statics);
let wanted = self
.inner
.internals
.lock()
.deref()
.borrow_mut()
.wanted(&self.inner.statics);
let mut stream = self.replenish_idle_connections(wanted);
while let Some(result) = stream.next().await {
result?
Expand All @@ -49,8 +56,8 @@ where
}

pub(crate) fn spawn_start_connections(&self) {
let mut locked = self.inner.internals.lock();
self.spawn_replenishing_approvals(locked.wanted(&self.inner.statics));
let locked = self.inner.internals.lock();
self.spawn_replenishing_approvals(locked.deref().borrow_mut().wanted(&self.inner.statics));
}

fn spawn_replenishing_approvals(&self, approvals: ApprovalIter) {
Expand Down Expand Up @@ -108,8 +115,9 @@ where
{
loop {
let mut conn = {
let mut locked = self.inner.internals.lock();
match locked.pop(&self.inner.statics) {
let locked = self.inner.internals.lock();
let mut pool = locked.deref().borrow_mut();
match pool.pop(&self.inner.statics) {
Some((conn, approvals)) => {
self.spawn_replenishing_approvals(approvals);
make_pooled_conn(self, conn)
Expand All @@ -134,8 +142,11 @@ where

let (tx, rx) = oneshot::channel();
{
let mut locked = self.inner.internals.lock();
let approvals = locked.push_waiter(tx, &self.inner.statics);
let locked = self.inner.internals.lock();
let approvals = locked
.deref()
.borrow_mut()
.push_waiter(tx, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
};

Expand All @@ -161,24 +172,27 @@ where
}
});

let mut locked = self.inner.internals.lock();
let locked = self.inner.internals.lock();
match conn {
Some(conn) => locked.put(conn, None, self.inner.clone()),
Some(conn) => locked
.deref()
.borrow_mut()
.put(conn, None, self.inner.clone()),
None => {
let approvals = locked.dropped(1, &self.inner.statics);
let approvals = locked.deref().borrow_mut().dropped(1, &self.inner.statics);
self.spawn_replenishing_approvals(approvals);
}
}
}

/// Returns information about the current state of the pool.
pub(crate) fn state(&self) -> State {
self.inner.internals.lock().state()
self.inner.internals.lock().deref().borrow().state()
}

fn reap(&self) {
let mut internals = self.inner.internals.lock();
let approvals = internals.reap(&self.inner.statics);
let internals = self.inner.internals.lock();
let approvals = internals.deref().borrow_mut().reap(&self.inner.statics);
self.spawn_replenishing_approvals(approvals);
}

Expand All @@ -205,16 +219,17 @@ where
match conn {
Ok(conn) => {
let conn = Conn::new(conn);
shared
.internals
.lock()
.put(conn, Some(approval), self.inner.clone());
shared.internals.lock().deref().borrow_mut().put(
conn,
Some(approval),
self.inner.clone(),
);
return Ok(());
}
Err(e) => {
if Instant::now() - start > self.inner.statics.connection_timeout {
let mut locked = shared.internals.lock();
locked.connect_failed(approval);
let locked = shared.internals.lock();
locked.deref().borrow_mut().connect_failed(approval);
return Err(e);
} else {
delay = max(Duration::from_millis(200), delay);
Expand Down
15 changes: 10 additions & 5 deletions bb8/src/internals.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::cell::RefCell;
use std::cmp::min;
use std::sync::Arc;
use std::time::Instant;

use futures_channel::oneshot;
use parking_lot::Mutex;
use parking_lot::ReentrantMutex;

use crate::api::{Builder, ManageConnection};
use std::collections::VecDeque;
use std::ops::Deref;

/// The guts of a `Pool`.
#[allow(missing_debug_implementations)]
Expand All @@ -16,7 +18,7 @@ where
{
pub(crate) statics: Builder<M>,
pub(crate) manager: M,
pub(crate) internals: Mutex<PoolInternals<M>>,
pub(crate) internals: ReentrantMutex<RefCell<PoolInternals<M>>>,
}

impl<M> SharedPool<M>
Expand All @@ -27,7 +29,7 @@ where
Self {
statics,
manager,
internals: Mutex::new(PoolInternals::default()),
internals: ReentrantMutex::new(RefCell::new(PoolInternals::default())),
}
}
}
Expand Down Expand Up @@ -188,8 +190,11 @@ impl<M: ManageConnection> InternalsGuard<M> {
impl<M: ManageConnection> Drop for InternalsGuard<M> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let mut locked = self.pool.internals.lock();
locked.put(conn, None, self.pool.clone());
let locked = self.pool.internals.lock();
locked
.deref()
.borrow_mut()
.put(conn, None, self.pool.clone());
}
}
}
Expand Down

0 comments on commit 5c1fe4a

Please sign in to comment.