diff --git a/aiopg/pool.py b/aiopg/pool.py index b1e1f70c..c70e8acd 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -153,13 +153,13 @@ def wait_closed(self): self._closed = True - def acquire(self): + def acquire(self, timeout=None): """Acquire free connection from the pool.""" - coro = self._acquire() + coro = self._acquire(timeout=timeout) return _PoolAcquireContextManager(coro, self) @asyncio.coroutine - def _acquire(self): + def _acquire(self, timeout=None): if self._closing: raise RuntimeError("Cannot acquire connection after closing pool") with (yield from self._cond): @@ -172,7 +172,11 @@ def _acquire(self): self._used.add(conn) return conn else: - yield from self._cond.wait() + waiter = self._cond.wait() + if timeout is not None: + yield from asyncio.wait_for(waiter, timeout=timeout, loop=self._loop) + else: + yield from waiter @asyncio.coroutine def _fill_free_pool(self, override_min): diff --git a/aiopg/sa/engine.py b/aiopg/sa/engine.py index 89871390..1221175e 100644 --- a/aiopg/sa/engine.py +++ b/aiopg/sa/engine.py @@ -133,14 +133,14 @@ def wait_closed(self): """Wait for closing all engine's connections.""" yield from self._pool.wait_closed() - def acquire(self): + def acquire(self, timeout=None): """Get a connection from pool.""" - coro = self._acquire() + coro = self._acquire(timeout=timeout) return _EngineAcquireContextManager(coro, self) @asyncio.coroutine - def _acquire(self): - raw = yield from self._pool.acquire() + def _acquire(self, timeout=None): + raw = yield from self._pool.acquire(timeout=timeout) conn = SAConnection(raw, self) return conn