Skip to content

Commit

Permalink
Initial implementation of default handling for pickled frames (pandas…
Browse files Browse the repository at this point in the history
…-dev#622)

* initial implementation of default halnding for pickled frmes

* MDP-3767 throw exceptions instead of falling back to default pickle behaviour

* updated the strict handler check mechanism to be at the library level, and then use the os.envion (if set), else by default disabled

* sanitized the tests for the strict handler checks

* clarified the decision of having the handler_supports_read_option option in the do_read of version store instead inside individual handlers
  • Loading branch information
willdealtry authored and dimosped committed Oct 25, 2018
1 parent eb23f47 commit 101c5e9
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 9 deletions.
5 changes: 2 additions & 3 deletions arctic/serialization/numpy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

DTN64_DTYPE = 'datetime64[ns]'


def _to_primitive(arr, string_max_len=None):
if arr.dtype.hasobject:
if len(arr) > 0:
Expand Down Expand Up @@ -145,8 +144,8 @@ def can_convert_to_records_without_objects(self, df, symbol):
return False
else:
if arr.dtype.hasobject:
log.info('Pandas dataframe %s contains Objects, saving as Blob' % symbol)
# Will fall-back to saving using Pickle
log.warning('Pandas dataframe %s contains Objects, saving as Blob' % symbol)
# Fall-back to saving using Pickle
return False
elif any([len(x[0].shape) for x in arr.dtype.fields.values()]):
log.info('Pandas dataframe %s contains >1 dimensional arrays, saving as Blob' % symbol)
Expand Down
10 changes: 9 additions & 1 deletion arctic/store/_ndarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,12 @@ def can_delete(self, version, symbol):
def can_read(self, version, symbol):
return version['type'] == self.TYPE

@staticmethod
def can_write_type(data):
return isinstance(data, np.ndarray)

def can_write(self, version, symbol, data):
return isinstance(data, np.ndarray) and not data.dtype.hasobject
return self.can_write_type(data) and not data.dtype.hasobject

def _dtype(self, string, metadata=None):
if metadata is None:
Expand Down Expand Up @@ -241,6 +245,10 @@ def get_info(self, version):
ret['rows'] = int(version['up_to'])
return ret

@staticmethod
def read_options():
return ['from_version']

def read(self, arctic_lib, version, symbol, read_preference=None, **kwargs):
index_range = self._index_range(version, symbol, **kwargs)
collection = arctic_lib.get_top_level_collection()
Expand Down
30 changes: 27 additions & 3 deletions arctic/store/_pandas_ndarray_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def _datetime64_index(self, recarr):
return name
return None

def read_options(self):
return ['date_range']

def _index_range(self, version, symbol, date_range=None, **kwargs):
""" Given a version, read the segment_index and return the chunks associated
with the date_range. As the segment index is (id -> last datetime)
Expand Down Expand Up @@ -143,8 +146,12 @@ class PandasSeriesStore(PandasStore):
TYPE = 'pandasseries'
SERIALIZER = SeriesSerializer()

@staticmethod
def can_write_type(data):
return isinstance(data, Series)

def can_write(self, version, symbol, data):
if isinstance(data, Series):
if self.can_write_type(data):
if data.dtype == np.object_ or data.index.dtype == np.object_:
return self.SERIALIZER.can_convert_to_records_without_objects(data, symbol)
return True
Expand All @@ -158,6 +165,9 @@ def append(self, arctic_lib, version, symbol, item, previous_version, **kwargs):
item, md = self.SERIALIZER.serialize(item)
super(PandasSeriesStore, self).append(arctic_lib, version, symbol, item, previous_version, dtype=md, **kwargs)

def read_options(self):
return super(PandasSeriesStore, self).read_options()

def read(self, arctic_lib, version, symbol, **kwargs):
item = super(PandasSeriesStore, self).read(arctic_lib, version, symbol, **kwargs)
return self.SERIALIZER.deserialize(item)
Expand All @@ -167,8 +177,12 @@ class PandasDataFrameStore(PandasStore):
TYPE = 'pandasdf'
SERIALIZER = DataFrameSerializer()

@staticmethod
def can_write_type(data):
return isinstance(data, DataFrame)

def can_write(self, version, symbol, data):
if isinstance(data, DataFrame):
if self.can_write_type(data):
if np.any(data.dtypes.values == 'object'):
return self.SERIALIZER.can_convert_to_records_without_objects(data, symbol)
return True
Expand All @@ -186,12 +200,19 @@ def read(self, arctic_lib, version, symbol, **kwargs):
item = super(PandasDataFrameStore, self).read(arctic_lib, version, symbol, **kwargs)
return self.SERIALIZER.deserialize(item)

def read_options(self):
return super(PandasDataFrameStore, self).read_options()


class PandasPanelStore(PandasDataFrameStore):
TYPE = 'pandaspan'

@staticmethod
def can_write_type(data):
return isinstance(data, Panel)

def can_write(self, version, symbol, data):
if isinstance(data, Panel):
if self.can_write_type(data):
frame = data.to_frame(filter_observations=False)
if np.any(frame.dtypes.values == 'object'):
return self.SERIALIZER.can_convert_to_records_without_objects(frame, symbol)
Expand Down Expand Up @@ -220,5 +241,8 @@ def read(self, arctic_lib, version, symbol, **kwargs):
return item.iloc[:, 0].unstack().to_panel()
return item.to_panel()

def read_options(self):
return super(PandasPanelStore, self).read_options()

def append(self, arctic_lib, version, symbol, item, previous_version, **kwargs):
raise ValueError('Appending not supported for pandas.Panel')
4 changes: 4 additions & 0 deletions arctic/store/_pickle_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def read(self, mongoose_lib, version, symbol, **kwargs):
return pickle_compat_load(io.BytesIO(data))
return version['data']

@staticmethod
def read_options():
return []

def write(self, arctic_lib, version, symbol, item, previous_version):
try:
# If it's encodeable, then ship it
Expand Down
40 changes: 40 additions & 0 deletions arctic/store/version_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime as dt, timedelta
import logging
import os

import bson
from pymongo import ReadPreference
Expand All @@ -21,6 +22,7 @@

VERSION_STORE_TYPE = 'VersionStore'
_TYPE_HANDLERS = []
STRICT_WRITE_HANDLER_MATCH = bool(os.environ.get('STRICT_WRITE_HANDLER_MATCH'))


def register_versioned_storage(storageClass):
Expand All @@ -45,6 +47,10 @@ def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
# 32MB buffer for change notifications
mongo_retry(c.database.create_collection)('%s.changes' % c.name, capped=True, size=32 * 1024 * 1024)

if 'STRICT_WRITE_HANDLER_MATCH' in kwargs:
arctic_lib.set_library_metadata('STRICT_WRITE_HANDLER_MATCH',
bool(kwargs.pop('STRICT_WRITE_HANDLER_MATCH')))

for th in _TYPE_HANDLERS:
th.initialize_library(arctic_lib, **kwargs)
VersionStore._bson_handler.initialize_library(arctic_lib, **kwargs)
Expand Down Expand Up @@ -79,6 +85,14 @@ def __init__(self, arctic_lib):
# Do we allow reading from secondaries
self._allow_secondary = self._arctic_lib.arctic._allow_secondary
self._reset()
self._with_strict_handler = None

@property
def _with_strict_handler_match(self):
if self._with_strict_handler is None:
strict_meta = self._arctic_lib.get_library_metadata('STRICT_WRITE_HANDLER_MATCH')
self._with_strict_handler = STRICT_WRITE_HANDLER_MATCH if strict_meta is None else strict_meta
return self._with_strict_handler

@mongo_retry
def _reset(self):
Expand Down Expand Up @@ -301,12 +315,21 @@ def _read_handler(self, version, symbol):
handler = self._bson_handler
return handler

@staticmethod
def handler_can_write_type(handler, data):
type_method = getattr(handler, "can_write_type", None)
if callable(type_method):
return type_method(data)
return False

def _write_handler(self, version, symbol, data, **kwargs):
handler = None
for h in _TYPE_HANDLERS:
if h.can_write(version, symbol, data, **kwargs):
handler = h
break
if self._with_strict_handler_match and self.handler_can_write_type(h, data):
raise ArcticException("Not falling back to default handler for %s" % symbol)
if handler is None:
version['type'] = 'default'
handler = self._bson_handler
Expand Down Expand Up @@ -384,10 +407,27 @@ def get_info(self, symbol, as_of=None):
return handler.get_info(version)
return {}

@staticmethod
def handler_supports_read_option(handler, option):
options_method = getattr(handler, "read_options", None)
if callable(options_method):
return option in options_method()

# If the handler doesn't support interrogation of its read options assume
# that it does support this option (i.e. fail-open)
return True

def _do_read(self, symbol, version, from_version=None, **kwargs):
if version.get('deleted'):
raise NoDataFoundException("No data found for %s in library %s" % (symbol, self._arctic_lib.get_name()))
handler = self._read_handler(version, symbol)
# We don't push the date_range check in the handler's code, since the "_with_strict_handler_match"
# value is configured on a per-library basis, and is part of the VersionStore instance.
if self._with_strict_handler_match and \
kwargs.get('date_range') and \
not self.handler_supports_read_option(handler, 'date_range'):
raise ArcticException("Date range arguments not supported by handler in %s" % symbol)

data = handler.read(self._arctic_lib, version, symbol, from_version=from_version, **kwargs)
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=version['version'],
metadata=version.pop('metadata', None), data=data,
Expand Down
72 changes: 71 additions & 1 deletion tests/integration/store/test_version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import struct
from datetime import datetime as dt, timedelta as dtd
import pandas as pd
from arctic import VERSION_STORE
from pandas.util.testing import assert_frame_equal
from pymongo.errors import OperationFailure
from pymongo.server_type import SERVER_TYPE
Expand All @@ -18,6 +19,7 @@
from arctic.exceptions import NoDataFoundException, DuplicateSnapshotException, ArcticException
from arctic.date import DateRange
from arctic.store import _version_store_utils
from arctic.store import version_store

from ...util import read_str_as_pandas
from arctic.date._mktz import mktz
Expand Down Expand Up @@ -1492,4 +1494,72 @@ def test_snapshot_list_versions_after_delete(library, library_name):

library.delete('symC')

assert {v['symbol'] for v in library.list_versions(snapshot='snapA')} == {'symA', 'symB', 'symC'}
assert {v['symbol'] for v in library.list_versions(snapshot='snapA')} == {'symA', 'symB', 'symC'}


def test_write_non_serializable_throws(arctic):
lib_name = 'write_hanlder_test'
arctic.initialize_library(lib_name, VERSION_STORE)
with patch('arctic.store.version_store.STRICT_WRITE_HANDLER_MATCH', True):
library = arctic[lib_name]

# Check that falling back to a pickle from a dataframe throws
df = pd.DataFrame({'a': [dict(a=1)]})

with pytest.raises(ArcticException):
library.write('ns1', df)

# Check that saving a regular dataframe succeeds with this option set
library.write('ns2', ts1)
assert_frame_equal(ts1, library.read('ns2').data)


def test_write_non_serializable_pickling_default(arctic):
lib_name = 'write_hanlder_test'
arctic.initialize_library(lib_name, VERSION_STORE)
library = arctic[lib_name]
df = pd.DataFrame({'a': [dict(a=1)]})
library.write('ns3', df)
assert_frame_equal(df, library.read('ns3').data)


def test_write_strict_no_daterange(arctic):
lib_name = 'write_hanlder_test'
arctic.initialize_library(lib_name, VERSION_STORE)

# Write with pickling
with patch('arctic.store.version_store.STRICT_WRITE_HANDLER_MATCH', True):
library = arctic[lib_name]
data = [dict(a=1)]
library.write('ns4', data)

# When the option is set, we should now be unable to read this item when we specify a
# date range, even though it was written successfully
with pytest.raises(ArcticException):
library.read('ns4', date_range=DateRange(dt(2017, 1, 1), dt(2017, 1, 2)))

assert data == library.read('ns4').data


def test_handler_check_default_false(arctic):
lib_name = 'write_hanlder_test1'
arctic.initialize_library(lib_name, VERSION_STORE)
assert arctic[lib_name]._with_strict_handler_match is False


def test_handler_check_default_osenviron(arctic):
with patch('arctic.store.version_store.STRICT_WRITE_HANDLER_MATCH', True):
lib_name = 'write_hanlder_test2'
arctic.initialize_library(lib_name, VERSION_STORE)
assert arctic[lib_name]._with_strict_handler_match is True

def test_handler_check_set_false(arctic):
lib_name = 'write_hanlder_test3'
arctic.initialize_library(lib_name, VERSION_STORE, STRICT_WRITE_HANDLER_MATCH=False)
assert arctic[lib_name]._with_strict_handler_match is False


def test_handler_check_set_true(arctic):
lib_name = 'write_hanlder_test4'
arctic.initialize_library(lib_name, VERSION_STORE, STRICT_WRITE_HANDLER_MATCH=True)
assert arctic[lib_name]._with_strict_handler_match is True
2 changes: 1 addition & 1 deletion tests/unit/serialization/test_numpy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_can_convert_to_records_without_objects_returns_false_when_records_have_
with patch('arctic.serialization.numpy_records.log') as mock_log:
assert store.can_convert_to_records_without_objects(sentinel.df, 'my_symbol') is False

mock_log.info.assert_called_once_with('Pandas dataframe my_symbol contains Objects, saving as Blob')
mock_log.warning.assert_called_once_with('Pandas dataframe my_symbol contains Objects, saving as Blob')
store._to_records.assert_called_once_with(sentinel.df)


Expand Down

0 comments on commit 101c5e9

Please sign in to comment.