Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ConnectionPools): added connection pools class #85

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions python/neuroglancer/pipeline/connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import Queue
import threading
import time
import signal
from functools import partial

from google.cloud.storage import Client
from boto.s3.connection import S3Connection

from neuroglancer.pipeline.secrets import PROJECT_NAME, google_credentials_path, aws_credentials

class ConnectionPool(object):
"""
This class is intended to be subclassed. See below.

Creating fresh client or connection objects
for Google or Amazon eventually starts causing
breakdowns when too many connections open.

To promote efficient resource use and prevent
containers from dying, we create a ConnectionPool
that allows for the reuse of connections.

Storage interfaces may acquire and release connections
when they need or finish using them.

If the limit is reached, additional requests for
acquiring connections will block until they can
be serviced.
"""
def __init__(self):
self.pool = Queue.Queue(maxsize=0)
self.outstanding = 0
self._lock = threading.Lock()

def handler(signum, frame):
self.reset_pool()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

def total_connections(self):
return self.pool.qsize() + self.outstanding

def _create_connection(self):
raise NotImplementedError

def get_connection(self):
with self._lock:
try:
conn = self.pool.get(block=False)
self.pool.task_done()
except Queue.Empty:
conn = self._create_connection()
finally:
self.outstanding += 1

return conn

def release_connection(self, conn):
if conn is None:
return

self.pool.put(conn)
with self._lock:
self.outstanding -= 1

def _close_function(self):
return lambda x: x # no-op

def reset_pool(self):
closefn = self._close_function()
while True:
if not self.pool.qsize():
break
try:
conn = self.pool.get()
closefn(conn)
self.pool.task_done()
except Queue.Empty:
break

with self._lock:
self.outstanding = 0

def __del__(self):
self.reset_pool()

class S3ConnectionPool(ConnectionPool):
def _create_connection(self):
return S3Connection(
aws_credentials['AWS_ACCESS_KEY_ID'],
aws_credentials['AWS_SECRET_ACCESS_KEY']
)

def _close_function(self):
return lambda conn: conn.close()

class GCloudConnectionPool(ConnectionPool):
def _create_connection(self):
return Client.from_service_account_json(
google_credentials_path,
project=PROJECT_NAME,
)
54 changes: 54 additions & 0 deletions python/test/test_connectionpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from tqdm import tqdm
import boto.s3.key

from neuroglancer.pipeline.connection_pool import S3ConnectionPool, GCloudConnectionPool
from neuroglancer.pipeline.threaded_queue import ThreadedQueue
from neuroglancer.pipeline import Storage

S3_POOL = S3ConnectionPool()
GC_POOL = GCloudConnectionPool()

def test_gc_stresstest():
with Storage('gs://neuroglancer/removeme/connection_pool/', n_threads=0) as stor:
stor.put_file('test', 'some string')

n_trials = 500
pbar = tqdm(total=n_trials)

def create_conn(interface):
conn = GC_POOL.get_connection()
# assert GC_POOL.total_connections() <= GC_POOL.max_connections * 5
bucket = conn.get_bucket('neuroglancer')
blob = bucket.get_blob('removeme/connection_pool/test')
blob.download_as_string()
GC_POOL.release_connection(conn)
pbar.update()

with ThreadedQueue(n_threads=200) as tq:
for _ in xrange(n_trials):
tq.put(create_conn)

pbar.close()

def test_s3_stresstest():
with Storage('s3://neuroglancer/removeme/connection_pool/', n_threads=0) as stor:
stor.put_file('test', 'some string')

n_trials = 500
pbar = tqdm(total=n_trials)

def create_conn(interface):
conn = S3_POOL.get_connection()
# assert S3_POOL.total_connections() <= S3_POOL.max_connections * 5
bucket = conn.get_bucket('neuroglancer')
k = boto.s3.key.Key(bucket)
k.key = 'removeme/connection_pool/test'
k.get_contents_as_string()
S3_POOL.release_connection(conn)
pbar.update()

with ThreadedQueue(n_threads=200) as tq:
for _ in xrange(n_trials):
tq.put(create_conn)

pbar.close()