Skip to content

Commit

Permalink
Disable all caching on xarray.Variable
Browse files Browse the repository at this point in the history
This is a follow-up to generalize the changes from pydata#1024:

- Caching and copy-on-write behavior has been moved to separate array classes
  that are explicitly used in `open_dataset` to wrap arrays loaded from disk (if
  `cache=True`).
- Dask specific logic has been removed from the caching/loading logic on
  `xarray.Variable`.
- Pickle no longer caches automatically under any circumstances.

Still needs tests for the `cache` argument to `open_dataset`, but everything
else seems to be working.
  • Loading branch information
shoyer committed Nov 16, 2016
1 parent d66f673 commit 81f9b94
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 97 deletions.
14 changes: 8 additions & 6 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ Breaking changes
merges will now succeed in cases that previously raised
``xarray.MergeError``. Set ``compat='broadcast_equals'`` to restore the
previous default.
- Pickling an xarray object based on the dask backend, or reading its
:py:meth:`values` property, won't automatically convert the array from dask
to numpy in the original object anymore.
If a dask object is used as a coord of a :py:class:`~xarray.DataArray` or
:py:class:`~xarray.Dataset`, its values are eagerly computed and cached,
but only if it's used to index a dim (e.g. it's used for alignment).
- Pickling an xarray object or reading its :py:attr:`~DataArray.values`
property no longer always caches values in a NumPy array. Caching
of ``.values`` read from netCDF files on disk is still the default when
:py:func:`open_dataset` is called with ``cache=True``.
By `Guido Imperiale <https://github.com/crusaderky>`_ and
`Stephan Hoyer <https://github.com/shoyer>`_.
- Coordinates used to index a dimension are now loaded eagerly into
:py:class:`pandas.Index` objects, instead of loading the values lazily.
By `Guido Imperiale <https://github.com/crusaderky>`_.

Deprecations
Expand Down
32 changes: 29 additions & 3 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

from .. import backends, conventions
from .common import ArrayWriter
from ..core import indexing
from ..core.combine import auto_combine
from ..core.utils import close_on_error, is_remote_uri
from ..core.variable import Variable, IndexVariable
from ..core.pycompat import basestring

DATAARRAY_NAME = '__xarray_dataarray_name__'
DATAARRAY_VARIABLE = '__xarray_dataarray_variable__'


def _get_default_engine(path, allow_remote=False):
if allow_remote and is_remote_uri(path): # pragma: no cover
try:
Expand Down Expand Up @@ -117,10 +120,20 @@ def check_attr(name, value):
check_attr(k, v)


def _protect_dataset_variables_inplace(dataset, cache):
for name, variable in dataset.variables.items():
if name not in variable.dims:
# no need to protect IndexVariable objects
data = indexing.CopyOnWriteArray(variable._data)
if cache:
data = indexing.MemoryCachedArray(data)
variable._data = data


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, drop_variables=None):
chunks=None, lock=None, cache=None, drop_variables=None):
"""Load and decode a dataset from a file or file-like object.
Parameters
Expand Down Expand Up @@ -162,14 +175,22 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
'netcdf4'.
chunks : int or dict, optional
If chunks is provided, it used to load the new dataset into dask
arrays. This is an experimental feature; see the documentation for more
details.
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays. This is an experimental feature; see the
documentation for more details.
lock : False, True or threading.Lock, optional
If chunks is provided, this argument is passed on to
:py:func:`dask.array.from_array`. By default, a per-variable lock is
used when reading data from netCDF files with the netcdf4 and h5netcdf
engines to avoid issues with concurrent access when using dask's
multithreaded backend.
cache : bool, optional
If True, cache data loaded from the underlying datastore in memory as
NumPy arrays when accessed to avoid reading from the underlying data-
store multiple times. Defaults to True unless you specify the `chunks`
argument to use dask, in which case it defaults to False. Does not
change the behavior of coordinates corresponding to dimensions, which
always load their data from disk into a ``pandas.Index``.
drop_variables: string or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
Expand All @@ -190,12 +211,17 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
concat_characters = False
decode_coords = False

if cache is None:
cache = chunks is not None

def maybe_decode_store(store, lock=False):
ds = conventions.decode_cf(
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
concat_characters=concat_characters, decode_coords=decode_coords,
drop_variables=drop_variables)

_protect_dataset_variables_inplace(ds, cache)

if chunks is not None:
try:
from dask.base import tokenize
Expand Down
5 changes: 3 additions & 2 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from .. import Variable
from ..core import indexing
from ..core.utils import FrozenOrderedDict, close_on_error, Frozen
from ..core.utils import (FrozenOrderedDict, close_on_error, Frozen,
NoPickleMixin)
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict

from .common import WritableCFDataStore
Expand Down Expand Up @@ -37,7 +38,7 @@ def _read_attributes(h5netcdf_var):
lsd_okay=False, backend='h5netcdf')


class H5NetCDFStore(WritableCFDataStore):
class H5NetCDFStore(WritableCFDataStore, NoPickleMixin):
"""Store for reading and writing data via h5netcdf
"""
def __init__(self, filename, mode='r', format=None, group=None,
Expand Down
6 changes: 3 additions & 3 deletions xarray/backends/netCDF4_.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .. import Variable
from ..conventions import pop_to, cf_encoder
from ..core import indexing
from ..core.utils import (FrozenOrderedDict, NDArrayMixin,
from ..core.utils import (FrozenOrderedDict, NDArrayMixin, NoPickleMixin,
close_on_error, is_remote_uri)
from ..core.pycompat import iteritems, basestring, OrderedDict, PY3

Expand All @@ -25,7 +25,7 @@
'|': 'native'}


class BaseNetCDF4Array(NDArrayMixin):
class BaseNetCDF4Array(NDArrayMixin, NoPickleMixin):
def __init__(self, array, is_remote=False):
self.array = array
self.is_remote = is_remote
Expand Down Expand Up @@ -176,7 +176,7 @@ def _extract_nc4_encoding(variable, raise_on_invalid=False, lsd_okay=True,
return encoding


class NetCDF4DataStore(WritableCFDataStore):
class NetCDF4DataStore(WritableCFDataStore, NoPickleMixin):
"""Store for reading and writing data via the Python-NetCDF4 library.
This store supports NetCDF3, NetCDF4 and OpenDAP datasets.
Expand Down
6 changes: 3 additions & 3 deletions xarray/backends/pynio_.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import numpy as np

from .. import Variable
from ..core.utils import FrozenOrderedDict, Frozen, NDArrayMixin
from ..core.utils import FrozenOrderedDict, Frozen, NDArrayMixin, NoPickleMixin
from ..core import indexing

from .common import AbstractDataStore


class NioArrayWrapper(NDArrayMixin):
class NioArrayWrapper(NDArrayMixin, NoPickleMixin):
def __init__(self, array, ds):
self.array = array
self._ds = ds # make an explicit reference because pynio uses weakrefs
Expand All @@ -25,7 +25,7 @@ def __getitem__(self, key):
return self.array[key]


class NioDataStore(AbstractDataStore):
class NioDataStore(AbstractDataStore, NoPickleMixin):
"""Store for accessing datasets via PyNIO
"""
def __init__(self, filename, mode='r'):
Expand Down
6 changes: 3 additions & 3 deletions xarray/backends/scipy_.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .. import Variable
from ..core.pycompat import iteritems, basestring, OrderedDict
from ..core.utils import Frozen, FrozenOrderedDict
from ..core.utils import Frozen, FrozenOrderedDict, NoPickleMixin
from ..core.indexing import NumpyIndexingAdapter

from .common import WritableCFDataStore
Expand All @@ -29,7 +29,7 @@ def _decode_attrs(d):
for (k, v) in iteritems(d))


class ScipyArrayWrapper(NumpyIndexingAdapter):
class ScipyArrayWrapper(NumpyIndexingAdapter, NoPickleMixin):
def __init__(self, netcdf_file, variable_name):
self.netcdf_file = netcdf_file
self.variable_name = variable_name
Expand Down Expand Up @@ -57,7 +57,7 @@ def __getitem__(self, key):
return data


class ScipyDataStore(WritableCFDataStore):
class ScipyDataStore(WritableCFDataStore, NoPickleMixin):
"""Store for reading and writing data via scipy.io.netcdf.
This store has the advantage of being able to be initialized with a
Expand Down
8 changes: 8 additions & 0 deletions xarray/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ def __dir__(self):
if isinstance(item, basestring)]
return sorted(set(dir(type(self)) + extra_attrs))

def __getstate__(self):
"""Get this object's state for pickling"""
# we need a custom method to avoid

# self.__dict__ is the default pickle object, we don't need to
# implement our own __setstate__ method to make pickle work
return self.__dict__


class SharedMethodsMixin(object):
"""Shared methods for Dataset, DataArray and Variable."""
Expand Down
21 changes: 10 additions & 11 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,12 @@ def load_store(cls, store, decoder=None):
return obj

def __getstate__(self):
"""Load data in-memory before pickling (except for Dask data)"""
for v in self.variables.values():
if not isinstance(v.data, dask_array_type):
v.load()
"""Get this object's state for pickling"""
# we need a custom method to avoid

# self.__dict__ is the default pickle object, we don't need to
# implement our own __setstate__ method to make pickle work
state = self.__dict__.copy()
# throw away any references to datastores in the pickle
state['_file_obj'] = None
return state
return self.__dict__

@property
def variables(self):
Expand Down Expand Up @@ -331,9 +326,8 @@ def load(self):
working with many file objects on disk.
"""
# access .data to coerce everything to numpy or dask arrays
all_data = dict((k, v.data) for k, v in self.variables.items())
lazy_data = dict((k, v) for k, v in all_data.items()
if isinstance(v, dask_array_type))
lazy_data = {k: v._data for k, v in self.variables.items()
if isinstance(v._data, dask_array_type)}
if lazy_data:
import dask.array as da

Expand All @@ -343,6 +337,11 @@ def load(self):
for k, data in zip(lazy_data, evaluated_data):
self.variables[k].data = data

# load everything else sequentially
for k, v in self.variables.items():
if k not in lazy_data:
v.load()

return self

def compute(self):
Expand Down
40 changes: 40 additions & 0 deletions xarray/core/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,46 @@ def __repr__(self):
(type(self).__name__, self.array, self.key))


class CopyOnWriteArray(utils.NDArrayMixin):
def __init__(self, array):
self.array = array
self._copied = False

def _ensure_copied(self):
if not self._copied:
self.array = np.array(self.array)
self._copied = True

def __array__(self, dtype=None):
return np.asarray(self.array, dtype=dtype)

def __getitem__(self, key):
return type(self)(self.array[key])

def __setitem__(self, key, value):
self._ensure_copied()
self.array[key] = value


class MemoryCachedArray(utils.NDArrayMixin):
def __init__(self, array):
self.array = array

def _ensure_cached(self):
if not isinstance(self.array, np.ndarray):
self.array = np.asarray(self.array)

def __array__(self, dtype=None):
self._ensure_cached()
return np.asarray(self.array, dtype=dtype)

def __getitem__(self, key):
return type(self)(self.array[key])

def __setitem__(self, key, value):
self.array[key] = value


def orthogonally_indexable(array):
if isinstance(array, np.ndarray):
return NumpyIndexingAdapter(array)
Expand Down
7 changes: 7 additions & 0 deletions xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,13 @@ def __repr__(self):
return '%s(array=%r)' % (type(self).__name__, self.array)


class NoPickleMixin(object):
def __getstate__(self):
raise TypeError(
'cannot pickle objects of type %r: call .compute() or .load() '
'to load data into memory first.' % type(self))


@contextlib.contextmanager
def close_on_error(f):
"""Context manager to ensure that a file opened by xarray is closed if an
Expand Down
Loading

0 comments on commit 81f9b94

Please sign in to comment.