Skip to content

Commit

Permalink
Fixes too many files open error: scipy backend
Browse files Browse the repository at this point in the history
  • Loading branch information
pwolfram committed Feb 5, 2017
1 parent f180b68 commit 8db354a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 36 deletions.
20 changes: 13 additions & 7 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gzip
import os.path
from distutils.version import StrictVersion
from glob import glob
Expand Down Expand Up @@ -133,7 +132,7 @@ def _protect_dataset_variables_inplace(dataset, cache):


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
mask_and_scale=True, decode_times=True, autoclose=True,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, cache=None, drop_variables=None):
"""Load and decode a dataset from a file or file-like object.
Expand Down Expand Up @@ -163,6 +162,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
Expand Down Expand Up @@ -216,7 +219,7 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
if cache is None:
cache = chunks is None

def maybe_decode_store(store, lock=False):
def maybe_decode_store(store, lock=False, autoclose=True):
ds = conventions.decode_cf(
store, mask_and_scale=mask_and_scale, decode_times=decode_times,
concat_characters=concat_characters, decode_coords=decode_coords,
Expand Down Expand Up @@ -251,7 +254,10 @@ def maybe_decode_store(store, lock=False):
else:
ds2 = ds

store.close()
# protect so that dataset isn't necessarily closed, e.g., streams
# like BytesIO can't be reopened
if autoclose:
store.close()

return ds2

Expand All @@ -275,7 +281,7 @@ def maybe_decode_store(store, lock=False):
"default engine or engine='scipy'")
# if the string ends with .gz, then gunzip and open as netcdf file
try:
store = backends.ScipyDataStore(gzip.open(filename_or_obj))
store = backends.ScipyDataStore(filename_or_obj, gzipfile=True)
except TypeError as e:
# TODO: gzipped loading only works with NetCDF3 files.
if 'is not a valid NetCDF 3 file' in e.message:
Expand Down Expand Up @@ -303,15 +309,15 @@ def maybe_decode_store(store, lock=False):
if lock is None:
lock = _default_lock(filename_or_obj, engine)
with close_on_error(store):
return maybe_decode_store(store, lock)
return maybe_decode_store(store, lock, autoclose)
else:
if engine is not None and engine != 'scipy':
raise ValueError('can only read file-like objects with '
"default engine or engine='scipy'")
# assume filename_or_obj is a file-like object
store = backends.ScipyDataStore(filename_or_obj)

return maybe_decode_store(store)
return maybe_decode_store(store, autoclose=autoclose)


def open_dataarray(filename_or_obj, group=None, decode_cf=True,
Expand Down
90 changes: 62 additions & 28 deletions xarray/backends/scipy_.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function
import functools
from io import BytesIO
import contextlib

import numpy as np
import warnings
Expand Down Expand Up @@ -37,30 +38,39 @@ def __init__(self, variable_name, datastore):

@property
def array(self):
return self.datastore.ds.variables[self.variable_name].data
with self.datastore.ensure_open():
return self.datastore.ds.variables[self.variable_name].data

@property
def dtype(self):
# always use native endianness
return np.dtype(self.array.dtype.kind + str(self.array.dtype.itemsize))
with self.datastore.ensure_open():
# always use native endianness
return np.dtype(self.array.dtype.kind +
str(self.array.dtype.itemsize))

def __getitem__(self, key):
data = super(ScipyArrayWrapper, self).__getitem__(key)
# Copy data if the source file is mmapped. This makes things consistent
# with the netCDF4 library by ensuring we can safely read arrays even
# after closing associated files.
copy = self.datastore.ds.use_mmap
data = np.array(data, dtype=self.dtype, copy=copy)
with self.datastore.ensure_open():
data = super(ScipyArrayWrapper, self).__getitem__(key)
# Copy data if the source file is mmapped.
# This makes things consistent
# with the netCDF4 library by ensuring
# we can safely read arrays even
# after closing associated files.
copy = self.datastore.ds.use_mmap
data = np.array(data, dtype=self.dtype, copy=copy)
return data


def _open_scipy_netcdf(filename, mode, mmap, version):
def _open_scipy_netcdf(filename, mode, mmap, version, gzipfile):
import scipy.io
import gzip

if isinstance(filename, bytes) and filename.startswith(b'CDF'):
# it's a NetCDF3 bytestring
filename = BytesIO(filename)

if gzipfile:
filename = gzip.open(filename)

return scipy.io.netcdf_file(filename, mode=mode, mmap=mmap,
version=version)

Expand All @@ -74,7 +84,7 @@ class ScipyDataStore(WritableCFDataStore, DataStorePickleMixin):
It only supports the NetCDF3 file-format.
"""
def __init__(self, filename_or_obj, mode='r', format=None, group=None,
writer=None, mmap=None):
writer=None, mmap=None, gzipfile=False):
import scipy
import scipy.io
if mode != 'r' and scipy.__version__ < '0.13': # pragma: no cover
Expand All @@ -98,26 +108,45 @@ def __init__(self, filename_or_obj, mode='r', format=None, group=None,

opener = functools.partial(_open_scipy_netcdf,
filename=filename_or_obj,
mode=mode, mmap=mmap, version=version)
mode=mode, mmap=mmap, version=version,
gzipfile=gzipfile)
self.ds = opener()
self._isopen = True
self._opener = opener
self._mode = mode

super(ScipyDataStore, self).__init__(writer)

@contextlib.contextmanager
def ensure_open(self, autoclose=True):
if not self._isopen:
try:
self.ds = self._opener()
self._isopen = True
yield
finally:
if autoclose:
self.close()
else:
yield

def open_store_variable(self, name, var):
return Variable(var.dimensions, ScipyArrayWrapper(name, self),
_decode_attrs(var._attributes))
with self.ensure_open():
return Variable(var.dimensions, ScipyArrayWrapper(name, self),
_decode_attrs(var._attributes))

def get_variables(self):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))
with self.ensure_open():
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))

def get_attrs(self):
return Frozen(_decode_attrs(self.ds._attributes))
with self.ensure_open():
return Frozen(_decode_attrs(self.ds._attributes))

def get_dimensions(self):
return Frozen(self.ds.dimensions)
with self.ensure_open():
return Frozen(self.ds.dimensions)

def get_encoding(self):
encoding = {}
Expand All @@ -126,19 +155,21 @@ def get_encoding(self):
return encoding

def set_dimension(self, name, length):
if name in self.dimensions:
raise ValueError('%s does not support modifying dimensions'
% type(self).__name__)
self.ds.createDimension(name, length)
with self.ensure_open(autoclose=False):
if name in self.dimensions:
raise ValueError('%s does not support modifying dimensions'
% type(self).__name__)
self.ds.createDimension(name, length)

def _validate_attr_key(self, key):
if not is_valid_nc3_name(key):
raise ValueError("Not a valid attribute name")

def set_attribute(self, key, value):
self._validate_attr_key(key)
value = encode_nc3_attr_value(value)
setattr(self.ds, key, value)
with self.ensure_open(autoclose=False):
self._validate_attr_key(key)
value = encode_nc3_attr_value(value)
setattr(self.ds, key, value)

def prepare_variable(self, name, variable, check_encoding=False,
unlimited_dims=None):
Expand All @@ -163,11 +194,13 @@ def prepare_variable(self, name, variable, check_encoding=False,
return scipy_var, data

def sync(self):
super(ScipyDataStore, self).sync()
self.ds.flush()
with self.ensure_open():
super(ScipyDataStore, self).sync()
self.ds.flush()

def close(self):
self.ds.close()
self._isopen = False

def __exit__(self, type, value, tb):
self.close()
Expand All @@ -179,3 +212,4 @@ def __setstate__(self, state):
# seek to the start of the file so scipy can read it
filename.seek(0)
super(ScipyDataStore, self).__setstate__(state)
self._isopen = True
2 changes: 1 addition & 1 deletion xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ def roundtrip(self, data, save_kwargs={}, open_kwargs={},
def test_bytesio_pickle(self):
data = Dataset({'foo': ('x', [1, 2, 3])})
fobj = BytesIO(data.to_netcdf())
with open_dataset(fobj) as ds:
with open_dataset(fobj, autoclose=False) as ds:
unpickled = pickle.loads(pickle.dumps(ds))
self.assertDatasetIdentical(unpickled, data)

Expand Down

0 comments on commit 8db354a

Please sign in to comment.