From 27710ad122a85684ba5ea4459dd9394f73c30d6b Mon Sep 17 00:00:00 2001 From: Kurtiss Hare Date: Fri, 5 Nov 2010 17:13:49 -0700 Subject: [PATCH] phiggy changes. --- setup.py | 1 + shrapnel/caching.py | 4 +- shrapnel/command.py | 1 - shrapnel/config.py | 237 -------------------------------------------- shrapnel/db.py | 174 +------------------------------- shrapnel/mongodb.py | 51 ---------- 6 files changed, 5 insertions(+), 463 deletions(-) delete mode 100644 shrapnel/config.py delete mode 100644 shrapnel/mongodb.py diff --git a/setup.py b/setup.py index ab946dc..d3d2810 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'Topic :: Software Development :: Libraries :: Python Modules' ], install_requires = [ +# 'confy', # 'tornado', # 'pymongo', # 'memcache' diff --git a/shrapnel/caching.py b/shrapnel/caching.py index d1230dc..a40601b 100644 --- a/shrapnel/caching.py +++ b/shrapnel/caching.py @@ -8,7 +8,7 @@ """ import bisect -import config +import confy import functools import itertools import os @@ -67,7 +67,7 @@ def decorated(*args, **kwargs): if not type(generator) == types.GeneratorType: raise_error("It must return a generator.") - mc = config.instance("memcache.{0}".format(memcached_instance)) + mc = confy.instance("memcache.{0}".format(memcached_instance)) if not callable(getattr(mc, 'get', None)) or not callable(getattr(mc, 'set', None)): raise_error("The instance for this function must support the .get(...) and .set(...) methods.") diff --git a/shrapnel/command.py b/shrapnel/command.py index 6b79f09..108f666 100644 --- a/shrapnel/command.py +++ b/shrapnel/command.py @@ -7,7 +7,6 @@ Copyright (c) 2010 Medium Entertainment, Inc. All rights reserved. """ -from shrapnel import config import logging import logging.handlers import optparse diff --git a/shrapnel/config.py b/shrapnel/config.py deleted file mode 100644 index 3081a56..0000000 --- a/shrapnel/config.py +++ /dev/null @@ -1,237 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -config.py - -Created by Kurtiss Hare on 2010-02-10. -Copyright (c) 2010 Medium Entertainment, Inc. All rights reserved. -""" - -import threading - -def _split_name(name): - if '.' not in name: - name += ".__default__" - return name.split('.') - -def instance(name, *args, **kwargs): - cls_name, method_name = _split_name(name) - provider = ProviderMetaclass.find(cls_name) - if provider._instance == None: - provider._instance = provider() - return provider._instance.__provide__(method_name) - -def settings(name): - cls_name, method_name = _split_name(name) - provider = ProviderMetaclass.find(cls_name) - if provider._instance == None: - provider._instance = provider() - return provider._instance.get_config(method_name) - -def list_instances(): - """ - Return a list of all top-level instance names. This will not include any - specific configuration. For instance, only 'foo' will be included in the - list if 'foo', 'foo.bar', 'foo.baz', etc are valid. - """ - return ProviderMetaclass._subclasses.keys() - - - -class ProviderMetaclass(type): - _subclasses = {} - - def __new__(cls, name, bases, attrs): - new_cls = type.__new__(cls, name, bases, attrs) - - if not attrs.pop('__abstract__', False): - namespace = attrs.get('__namespace__', False) - - if not namespace: - import re - match = re.match(r'^(.*)ConfigurationProvider$', name) - - if match: - namespace = match.group(1).lower() - else: - namespace = name - - cls._subclasses[namespace] = new_cls - - return new_cls - - @classmethod - def find(cls, cls_name): - try: - return cls._subclasses[cls_name] - except KeyError: - raise LookupError, "Couldn't find provider class for {0}".format(cls_name) - - -class SingletonProvider(object): - def __init__(self, *args, **kwargs): - self._instances = dict() - super(SingletonProvider, self).__init__(*args, **kwargs) - - def __provide__(self, method_name): - try: - result = self._instances[method_name] - except KeyError: - with threading.Lock(): - try: - result = self._instances[method_name] - except KeyError: - config = self.get_config(method_name) - result = self._instances[method_name] = self.construct(config) - - return result - - -class InstanceProvider(object): - def __provide__(self, method_name): - config = self. get_config(method_name) - return self.construct(config) - - -class Provider(object): - __metaclass__ = ProviderMetaclass - __abstract__ = True - _instance = None - - def __defaults__(self): - return dict() - - def construct(self, configuration): - return configuration - - def get_config(self, method_name): - config_method = getattr(self, method_name) - config = {} - config.update(self.__defaults__()) - config.update(config_method()) - return config - - def __init__(self): - self._instances = {} - super(Provider, self).__init__() - - def __default__(self): - return dict() - - def __provide__(self, method_name): - raise RuntimeError("A __provide__ method has not been set for this provider.") - - -class MongoConnectionProvider(SingletonProvider, Provider): - __abstract__ = True - - def construct(self, config): - import pymongo.connection - - l_port = int(config['port']) - r_host = config.get('r_host') - r_port = config.get('r_port') or l_port - - if r_host: - conn = pymongo.connection.Connection.paired( - (config['host'], l_port), - right=(r_host, int(r_port)) - ) - else: - conn = pymongo.connection.Connection( - config['host'], - l_port, - network_timeout = config['timeout'] - ) - - return conn[config['database']] - - def __defaults__(self): - return dict( - host = 'localhost', - port = 27017, - database = 'database', - timeout = None, - r_host = None, - r_port = None - ) - - -class MongoProvider(Provider): - __abstract__ = True - - class DummyDB(object): - def __getattr__(self, name): - raise NotImplementedError - - def __provide__(self, method_name): - from . import mongodb - config = self.get_config(method_name) - if config.get('dummy', False): - from warnings import warn - warn("Using Dummy Mongodb. If you don't know what this means, disable the dummy option in your mongo settings.") - return self.DummyDB() - - return mongodb.MongoHelper(method_name, config) - - def __defaults__(self): - # better option? - return dict( - provider = 'no_provider_defined' - ) - -_client_data = threading.local() -class MemcacheProvider(InstanceProvider, Provider): - __abstract__ = True - - def construct(self, config): - global _client_data - if not hasattr(_client_data, 'conn'): - import memcache - _client_data.conn = memcache.Client(["%s:%d" % (config['host'], config['port'])], debug=config['debug']) - return _client_data.conn - - def __defaults__(self): - return dict( - host = 'localhost', - port = 11211, - debug = 0 - ) - - -class DbPoolProvider(SingletonProvider, Provider): - __abstract__ = True - - def construct(self, config): - import db - return db.ConnectionPool.instance( - config['host'], - config['database'], - config['user'], - config['password'] - ) - - def __defaults__(self): - return dict( - host = 'localhost:3306', # '/path/to/mysql.sock' - database = 'database', - user = None, - password = None - ) - - -class DatabaseProvider(InstanceProvider, Provider): - __abstract__ = True - - def construct(self, config): - import db - return db.Connection(config['pool']) - - def __defaults__(self): - return dict( - pool = '__default__' - ) - - -class ConfigurationLookupError(LookupError): - pass diff --git a/shrapnel/db.py b/shrapnel/db.py index 34b05ce..f6c8faa 100644 --- a/shrapnel/db.py +++ b/shrapnel/db.py @@ -7,18 +7,8 @@ Copyright (c) 2010 Medium Entertainment, Inc. All rights reserved. """ -import collections -import datetime import functools -import security -import string -import threading -import tornado.database -import tornado.ioloop import types -import weakref - -from . import config def transaction(retries=1): @@ -35,7 +25,7 @@ def decorated(*args, **kwargs): except StopIteration: db = None - if type(db) != Connection: + if not hasattr(db, 'execute') or not callable(db.execute): raise RuntimeError('bad type for "db"') exception = None @@ -71,164 +61,4 @@ def __parameterize__(self, format_spec): def __str__(self): import itertools - return "({0})".format(', '.join(itertools.repeat("%s", len(self.list)))) - - -class _ParameterizingFormatter(string.Formatter): - def __init__(self, *args, **kwargs): - super(_ParameterizingFormatter, self).__init__(*args, **kwargs) - self.parameters = [] - - def format_field(self, value, format_spec): - if hasattr(value, '__parameterize__'): - parameters, formatted_value = value.__parameterize__(format_spec) - self.parameters.extend(parameters) - - return formatted_value - - self.parameters.append(value) - return "%s" - - -class ConnectionPool(object): - """ - Tornado database connection pool. acquire() to grab a connection and reliniquish(connection) to - move it back to the pool. Caching of connections is keyed off thread-local-storage. - """ - _instances = weakref.WeakValueDictionary() - _pruner = None - _pruner_lock = threading.Lock() - _tls_lock = threading.Lock() - - @classmethod - def instance(cls, *args, **kwargs): - normalized = cls.normalize_args(*args, **kwargs) - default_instance = cls(*args, **kwargs) - instance = cls._instances.setdefault(normalized, default_instance) - return instance - - @classmethod - def normalize_args(cls, host, database, user = None, password = None): - return (host, database, user, password) - - @classmethod - def prune(cls): - now = datetime.datetime.utcnow() - max_idle_time = datetime.timedelta(minutes = 1) - nonempty_pools = 0 - - for instance in cls._instances.values(): - if instance: - for pool in instance.pools.values(): - while pool: - connection, last_used_time = pool.popleft() - - if (now - last_used_time) <= max_idle_time: - nonempty_pools += 1 - pool.appendleft((connection, last_used_time)) - break - - if nonempty_pools == 0: - with cls._pruner_lock: - if cls._pruner: - cls._pruner.stop() - cls._pruner = None - - def __init__(self, *args, **kwargs): - self.tls = threading.local() - self.pools = weakref.WeakValueDictionary() - self.args = args - self.kwargs = kwargs - - @property - def thread_local_lock(self): - if not hasattr(self.tls, 'lock'): - with self._tls_lock: - if not hasattr(self.tls, 'lock'): - self.tls.lock = threading.Lock() - return self.tls.lock - - @property - def thread_local_pool(self): - if not hasattr(self.tls, 'pool'): - with self.thread_local_lock: - if not hasattr(self.tls, 'pool'): - self.tls.pool = collections.deque() - self.pools[id(self.tls.pool)] = self.tls.pool - return self.tls.pool - - def acquire(self): - try: - (connection, last_used_time) = self.thread_local_pool.pop() - except IndexError: - connection = self.create() - - return connection - - def create(self): - return tornado.database.Connection(*self.args, **self.kwargs) - - def reliniquish(self, connection): - self.thread_local_pool.append((connection, datetime.datetime.utcnow())) - - with self._pruner_lock: - if not self._pruner: - self._pruner = tornado.ioloop.PeriodicCallback(self.prune, 5000) - self._pruner.start() - - -class Connection(object): - def __init__(self, pool = '__default__'): - self.pool = config.instance("dbpool.{0}".format(pool)) - self._connection = None - - @property - def connection(self): - if not self._connection: - self._connection = self.pool.acquire() - return self._connection - - def __del__(self): - self.close() - - def close(self): - if self._connection: - self.pool.reliniquish(self._connection) - - def reconnect(self): - self.connection.reconnect() - - def iter(self, query, *format_args, **format_kwargs): - return self._call_with_reconnect(self.connection.iter, query, format_args, format_kwargs) - - def query(self, query, *format_args, **format_kwargs): - return self._call_with_reconnect(self.connection.query, query, format_args, format_kwargs) - - def get(self, query, *format_args, **format_kwargs): - return self._call_with_reconnect(self.connection.get, query, format_args, format_kwargs) - - def execute(self, query, *format_args, **format_kwargs): - return self._call_with_reconnect(self.connection.execute, query, format_args, format_kwargs) - - def executemany(self, query, *format_args, **format_kwargs): - return self._call_with_reconnect(self.connection.executemany, query, format_args, format_kwargs) - - def executecursor(self, query, *format_args, **format_kwargs): - def _executecursor(q, *params): - cursor = self.connection._db.cursor() - cursor.execute(q, params) - return cursor - - return self._call_with_reconnect(_executecursor, query, format_args, format_kwargs) - - def _call_with_reconnect(self, callable, query, format_args, format_kwargs): - formatter = _ParameterizingFormatter() - query = formatter.vformat(query, format_args, format_kwargs) - - try: - result = callable(query, *formatter.parameters) - except tornado.database.OperationalError: - self.reconnect() - result = callable(query, *formatter.parameters) - - return result \ No newline at end of file + return "({0})".format(', '.join(itertools.repeat("%s", len(self.list)))) \ No newline at end of file diff --git a/shrapnel/mongodb.py b/shrapnel/mongodb.py deleted file mode 100644 index bc2aa02..0000000 --- a/shrapnel/mongodb.py +++ /dev/null @@ -1,51 +0,0 @@ -import functools -import threading - -import pymongo.connection -import pymongo.errors - -from . import config - -class MongoHelper(object): - def __init__(self, method_name, config): - self.config = config - self.provider = config["provider"] - self.key = method_name - - def do(self, *args): - results = [] - exc = None - - db = self.get_db() - try: - for fn in args: - for i in xrange(0, 2): - try: - results.append(fn(db)) - except (pymongo.errors.AutoReconnect), e: - exc = e - else: - exc = None - break - - if exc: - raise exc - - finally: - db.connection.end_request() - - if len(args) == 1: - return results[0] - - return results - - def get_db(self): - """ - Get a new instance of the mongo db. - """ - manipulators = self.config.get('son_manipulators', []) - db = config.instance("{0}.{1}".format(self.provider, self.key)) - for manipulator in manipulators: - db.add_son_manipulator(manipulator) - return db -