Skip to content

Commit

Permalink
Merge pull request pandas-dev#51 from bmoscon/master
Browse files Browse the repository at this point in the history
Expose data info via VersionedItem
  • Loading branch information
bmoscon committed Nov 24, 2015
2 parents 7eb4d43 + cb146b2 commit 1fb2271
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 58 deletions.
10 changes: 10 additions & 0 deletions arctic/store/_base_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class BaseStore(object):
def read(self, lib, version, symbol, **kwargs):
pass

def write(self, lib, version, symbol, item, previous_version):
pass

def get_info(self, lib, version, symbol, **kwargs):
pass

25 changes: 10 additions & 15 deletions arctic/store/_ndarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pymongo
from pymongo.errors import OperationFailure, DuplicateKeyError

from ._base_store import BaseStore
from ..decorators import mongo_retry, dump_bad_documents
from ..exceptions import UnhandledDtypeException
from ._version_store_utils import checksum
Expand Down Expand Up @@ -36,7 +37,7 @@ def _promote(type1, type2):
return np.dtype([(n, _promote(dtype1.fields[n][0], dtype2.fields.get(n, (None,))[0])) for n in dtype1.names])


class NdarrayStore(object):
class NdarrayStore(BaseStore):
"""Chunked store for arbitrary ndarrays, supporting append.
for the simple example:
Expand Down Expand Up @@ -151,26 +152,20 @@ def _index_range(self, version, symbol, from_version=None, **kwargs):
return from_index, None

def get_info(self, arctic_lib, version, symbol, **kwargs):
ret = {}
collection = arctic_lib.get_top_level_collection()
dtype = self._dtype(version['dtype'], version.get('dtype_metadata', {}))
length = int(version['up_to'])
ret['dtype'] = self._dtype(version['dtype'], version.get('dtype_metadata', {}))
ret['length'] = int(version['up_to'])

spec = {'symbol': symbol,
'parent': version.get('base_version_id', version['_id']),
'segment': {'$lt': length}}

n_segments = collection.find(spec).count()

est_size = dtype.itemsize * length
return """Handler: %s
dtype: %s
'segment': {'$lt': ret['length']}}

%d rows in %d segments
Data size: %s bytes
ret['n_segments'] = collection.find(spec).count()

Version document:
%s""" % (self.__class__.__name__, dtype, length, n_segments, est_size, pprint.pformat(version))
ret['est_size'] = ret['dtype'].itemsize * ret['length']
ret['handler'] = self.__class__.__name__
return ret

def read(self, arctic_lib, version, symbol, read_preference=None, **kwargs):
index_range = self._index_range(version, symbol, **kwargs)
Expand Down
6 changes: 6 additions & 0 deletions arctic/store/_pandas_ndarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ def read(self, arctic_lib, version, symbol, read_preference=None, date_range=Non
if date_range:
item = self._daterange(item, date_range)
return item

def get_info(self, arctic_lib, version, symbol, **kwargs):
ret = super(PandasStore, self).get_info(arctic_lib, version, symbol, **kwargs)
ret['type'] = version['type']
ret['col_names'] = version['dtype_metadata']
return ret


def _start_end(date_range, dts):
Expand Down
8 changes: 6 additions & 2 deletions arctic/store/_pickle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,28 @@
import pprint

from arctic.store._version_store_utils import checksum, pickle_compat_load
from ._base_store import BaseStore

_MAGIC_CHUNKED = '__chunked__'
_CHUNK_SIZE = 15 * 1024 * 1024 # 15MB
_MAX_BSON_ENCODE = 256 * 1024 # 256K - don't fill up the version document with encoded bson


class PickleStore(object):
class PickleStore(BaseStore):

@classmethod
def initialize_library(cls, *args, **kwargs):
pass

def get_info(self, arctic_lib, version, symbol, **kwargs):
ret = {}
if 'blob' in version:
if version['blob'] != _MAGIC_CHUNKED:
version['blob'] = "<Compressed pickle.....>"

return """Handler: %s\n\nVersion document:\n%s""" % (self.__class__.__name__, pprint.pformat(version))
ret['handler'] = self.__class__.__name__
ret['type'] = 'blob'
return ret

def read(self, mongoose_lib, version, symbol, **kwargs):
blob = version.get("blob")
Expand Down
2 changes: 1 addition & 1 deletion arctic/store/audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, version_store, symbol, user, log, modify_timeseries=None, *ar
versions = [x['version'] for x in self._version_store.list_versions(self._symbol, latest_only=True)]
versions.append(0)
self.base_ts = VersionedItem(symbol=self._symbol, library=None,
version=versions[0], metadata=None, data=None)
version=versions[0], metadata=None, data=None, info=None)
except OperationFailure:
#TODO: Current errors in mongo "Incorrect Number of Segments Returned"
# This workaround should be removed once underlying problem is resolved.
Expand Down
25 changes: 13 additions & 12 deletions arctic/store/version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,21 +352,19 @@ def _show_info(self, symbol, as_of=None):
"""
print self._get_info(symbol, as_of)

def _get_info(self, symbol, as_of=None):
_version = self._read_metadata(symbol, as_of=as_of)
handler = self._read_handler(_version, symbol)
if hasattr(handler, "get_info"):
return handler.get_info(self._arctic_lib, _version, symbol)
else:
return """Handler: %s\n\nVersion document:\n%s""" % (handler.__class__.__name__, pprint.pformat(_version))
def _get_info(self, symbol, as_of=None, version=None):
if not version:
version = self._read_metadata(symbol, as_of=as_of)
handler = self._read_handler(version, symbol)
return handler.get_info(self._arctic_lib, version, symbol)

def _do_read(self, symbol, version, from_version=None, **kwargs):
handler = self._read_handler(version, symbol)
data = handler.read(self._arctic_lib, version, symbol, from_version=from_version, **kwargs)
if data is None:
raise NoDataFoundException("No data found for %s in library %s" % (symbol, self._arctic_lib.get_name()))
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=data)
metadata=version.pop('metadata', None), data=data, info=self._get_info(symbol, version=version))
_do_read_retry = mongo_retry(_do_read)

@mongo_retry
Expand All @@ -391,8 +389,9 @@ def read_metadata(self, symbol, as_of=None, allow_secondary=None):
`False` : only allow reads from primary members
"""
_version = self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary))
handler = self._read_handler(_version, symbol)
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=_version['version'],
metadata=_version.pop('metadata', None), data=None)
metadata=_version.pop('metadata', None), data=None, info=self._get_info(symbol, version=_version))

def _read_metadata(self, symbol, as_of=None, read_preference=None):
if read_preference is None:
Expand Down Expand Up @@ -461,8 +460,9 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser
sort=[('version', pymongo.DESCENDING)])

if len(data) == 0 and previous_version is not None:
handler = self._read_handler(previous_version, symbol)
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=previous_version,
metadata=version.pop('metadata', None), data=None)
metadata=version.pop('metadata', None), data=None, info=self._get_info(symbol, version=previous_version))

if upsert and previous_version is None:
return self.write(symbol=symbol, data=data, prune_previous_version=prune_previous_version, metadata=metadata)
Expand Down Expand Up @@ -511,9 +511,10 @@ def append(self, symbol, data, metadata=None, prune_previous_version=True, upser

if prune_previous_version and previous_version:
self._prune_previous_versions(symbol)
handler = self._read_handler(version, symbol)

return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=None)
metadata=version.pop('metadata', None), data=None, info=self._get_info(symbol, version=version))

def _publish_change(self, symbol, version):
if self._publish_changes:
Expand Down Expand Up @@ -571,7 +572,7 @@ def write(self, symbol, data, metadata=None, prune_previous_version=True, **kwar
self._publish_change(symbol, version)

return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=None)
metadata=version.pop('metadata', None), data=None, info=self._get_info(symbol, version=version))

def _prune_previous_versions(self, symbol, keep_mins=120):
"""
Expand Down
9 changes: 5 additions & 4 deletions arctic/store/versioned_item.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
from collections import namedtuple


class VersionedItem(namedtuple('VersionedItem', ['symbol', 'library', 'data', 'version', 'metadata'])):
class VersionedItem(namedtuple('VersionedItem', ['symbol', 'library', 'data', 'version', 'metadata', 'info'])):
"""
Class representing a Versioned object in VersionStore.
"""
def metadata_dict(self):
return {'symbol': self.symbol, 'library': self.library, 'version': self.version}
return {'symbol': self.symbol, 'library': self.library, 'version': self.version,
'info': self.info}

def __repr__(self):
return str(self)

def __str__(self):
return "VersionedItem(symbol=%s,library=%s,data=%s,version=%s,metadata=%s" % \
(self.symbol, self.library, type(self.data), self.version, self.metadata)
return "VersionedItem(symbol=%s,library=%s,data=%s,version=%s,metadata=%s,info=%s" % \
(self.symbol, self.library, type(self.data), self.version, self.metadata, self.info)


ChangedItem = namedtuple('ChangedItem', ['symbol', 'orig_version', 'new_version', 'changes'])
2 changes: 1 addition & 1 deletion tests/integration/store/test_ndarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_save_read_big_2darray(library):
def test_get_info_bson_object(library):
ndarr = np.ones(1000)
library.write('MYARR', ndarr)
assert library._get_info('MYARR').startswith('''Handler: NdarrayStore''')
assert library._get_info('MYARR')['handler'] == 'NdarrayStore'


def test_save_read_ndarray_with_array_field(library):
Expand Down
24 changes: 23 additions & 1 deletion tests/integration/store/test_pandas_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import itertools
from mock import Mock, patch
import string
from numpy import dtype as dtype

from arctic.date import DateRange, mktz
from arctic._compression import decompress
Expand Down Expand Up @@ -211,7 +212,6 @@ def test_append_pandas_dataframe(library):
def test_empty_dataframe_multindex(library):
df = DataFrame({'a': [], 'b': [], 'c': []})
df = df.groupby(['a', 'b']).sum()
print df
library.write('pandas', df)
saved_df = library.read('pandas').data
assert np.all(df.values == saved_df.values)
Expand Down Expand Up @@ -797,3 +797,25 @@ def test_daterange_fails_with_timezone_start(library):
with pytest.raises(ValueError):
library.read('MYARR', date_range=DateRange(start=dt(2015, 1, 1, tzinfo=mktz())))

def test_data_info_series(library):
s = Series(data=[1, 2, 3], index=[4, 5, 6])
library.write('pandas', s)
md = library.read('pandas').info
assert md == library.read_metadata('pandas').info
assert md == {'dtype': dtype([('index', '<i8'), ('values', '<i8')]), 'length': 3, 'handler': 'PandasSeriesStore', 'est_size': 48, 'col_names': {u'index': [u'index'], u'columns': [u'values']}, 'n_segments': 1, 'type': u'pandasseries'}


def test_data_info_df(library):
s = DataFrame(data=[1, 2, 3], index=[4, 5, 6])
library.write('pandas', s)
md = library.read('pandas').info
assert md == library.read_metadata('pandas').info
assert md == {'dtype': dtype([('index', '<i8'), ('0', '<i8')]), 'length': 3, 'handler': 'PandasDataFrameStore', 'est_size': 48, 'col_names': {u'index': [u'index'], u'columns': [u'0']}, 'n_segments': 1, 'type': u'pandasdf'}


def test_data_info_cols(library):
i = MultiIndex.from_tuples([(1, "ab"), (2, "bb"), (3, "cb")])
s = DataFrame(data=[100, 200, 300], index=i)
library.write('test_data', s)
md = library.read_metadata('test_data').info
assert md == {'dtype': dtype([('level_0', '<i8'), ('level_1', 'S2'), ('0', '<i8')]), 'length': 3, 'handler': 'PandasDataFrameStore', 'est_size': 54, 'col_names': {u'index': [u'level_0', u'level_1'], u'columns': [u'0']}, 'n_segments': 1, 'type': u'pandasdf'}
2 changes: 1 addition & 1 deletion tests/integration/store/test_pickle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_save_read_bson_object(library):
def test_get_info_bson_object(library):
blob = {'foo': dt(2015, 1, 1), 'object': Arctic}
library.write('BLOB', blob)
assert library._get_info('BLOB').startswith('Handler: PickleStore')
assert library._get_info('BLOB')['handler'] == 'PickleStore'


def test_bson_large_object(library):
Expand Down
8 changes: 5 additions & 3 deletions tests/unit/store/test_version_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ def test_versioned_item_str():
library="ONEMINUTE",
data=pd.DataFrame(),
version=1.0,
metadata={'metadata': 'foo'})
metadata={'metadata': 'foo'},
info=None)

expected = "VersionedItem(symbol=sym,library=ONEMINUTE," + \
"data=<class 'pandas.core.frame.DataFrame'>,version=1.0,metadata={'metadata': 'foo'}"
"data=<class 'pandas.core.frame.DataFrame'>,version=1.0,metadata={'metadata': 'foo'},info=None"
assert str(item) == expected
assert repr(item) == expected

Expand All @@ -21,6 +22,7 @@ def test_versioned_item_str_handles_none():
library=None,
data=None,
version=None,
metadata=None)
metadata=None,
info=None)

assert str(item)
Loading

0 comments on commit 1fb2271

Please sign in to comment.