Skip to content

Commit

Permalink
Fixes open_mfdataset too many open file error (#1198)
Browse files Browse the repository at this point in the history
Includes testing to demonstrate an OSError associated
with opening too many files as encountered
using open_mfdataset.

Fixed for the following backends:
 * netCDF4 backend
 * scipy backend
 * pynio backend

Open/close operations on h5netcdf appear to have an
error associated with the h5netcdf library following
correspondence with @shoyer.
Thus, there are still challenges with h5netcdf;
hence, support for h5netcdf is currently disabled.

Note, by default `autoclose=False` for open_mfdataset so standard
behavior is unchanged unless `autoclose=True`.

This choice of default is to select standard xarray performance over
general removal of the OSError associated with opening too many files as
encountered using open_mfdataset.
  • Loading branch information
pwolfram authored and shoyer committed Mar 23, 2017
1 parent b3fc6c4 commit 371d034
Show file tree
Hide file tree
Showing 11 changed files with 623 additions and 205 deletions.
6 changes: 6 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Added new method :py:meth:`~Dataset.assign_attrs` to ``DataArray`` and
``dict.update`` method on attrs (:issue:`1281`).
By `Henry S. Harrison <https://hsharrison.github.io>`_.

- It is now possible to set the ``autoclose=True`` argument to
:py:func:`~xarray.open_mfdataset` to explicitly close opened files when not
in use to prevent occurrence of an OS Error related to too many open files.
Note, the default is ``autoclose=False``, which is consistent with previous
xarray behavior. By `Phillip J. Wolfram <https://github.com/pwolfram>`_.

Bug fixes
~~~~~~~~~
- ``rolling`` now keeps its original dimension order (:issue:`1125`).
Expand Down
66 changes: 38 additions & 28 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gzip
import os.path
from distutils.version import LooseVersion
from glob import glob
Expand Down Expand Up @@ -133,7 +132,7 @@ def _protect_dataset_variables_inplace(dataset, cache):


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
mask_and_scale=True, decode_times=True, autoclose=False,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, cache=None, drop_variables=None):
"""Load and decode a dataset from a file or file-like object.
Expand Down Expand Up @@ -163,6 +162,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
Expand Down Expand Up @@ -251,6 +254,12 @@ def maybe_decode_store(store, lock=False):
else:
ds2 = ds

# protect so that dataset store isn't necessarily closed, e.g.,
# streams like BytesIO can't be reopened
# datastore backend is responsible for determining this capability
if store._autoclose:
store.close()

return ds2

if isinstance(filename_or_obj, backends.AbstractDataStore):
Expand All @@ -271,33 +280,30 @@ def maybe_decode_store(store, lock=False):
if engine is not None and engine != 'scipy':
raise ValueError('can only read gzipped netCDF files with '
"default engine or engine='scipy'")
# if the string ends with .gz, then gunzip and open as netcdf file
try:
store = backends.ScipyDataStore(gzip.open(filename_or_obj))
except TypeError as e:
# TODO: gzipped loading only works with NetCDF3 files.
if 'is not a valid NetCDF 3 file' in e.message:
raise ValueError('gzipped file loading only supports '
'NetCDF 3 files.')
else:
raise
else:
if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore(filename_or_obj, group=group)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj)
elif engine == 'pydap':
store = backends.PydapDataStore(filename_or_obj)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)
engine = 'scipy'

if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore(filename_or_obj, group=group,
autoclose=autoclose)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj,
autoclose=autoclose)
elif engine == 'pydap':
store = backends.PydapDataStore(filename_or_obj)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group,
autoclose=autoclose)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj,
autoclose=autoclose)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)

if lock is None:
lock = _default_lock(filename_or_obj, engine)
with close_on_error(store):
Expand Down Expand Up @@ -479,6 +485,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
lock : False, True or threading.Lock, optional
This argument is passed on to :py:func:`dask.array.from_array`. By
default, a per-variable lock is used when reading data from netCDF
Expand Down
39 changes: 37 additions & 2 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time
import traceback
import contextlib
from collections import Mapping
from distutils.version import LooseVersion

Expand Down Expand Up @@ -41,6 +42,15 @@ def _decode_variable_name(name):
return name


def find_root(ds):
"""
Helper function to find the root of a netcdf or h5netcdf dataset.
"""
while ds.parent is not None:
ds = ds.parent
return ds


def robust_getitem(array, key, catch=Exception, max_retries=6,
initial_delay=500):
"""
Expand All @@ -67,6 +77,7 @@ def robust_getitem(array, key, catch=Exception, max_retries=6,


class AbstractDataStore(Mapping):
_autoclose = False

def __iter__(self):
return iter(self.variables)
Expand Down Expand Up @@ -107,8 +118,8 @@ def load(self):
This function will be called anytime variables or attributes
are requested, so care should be taken to make sure its fast.
"""
variables = FrozenOrderedDict((_decode_variable_name(k), v) for k, v in
iteritems(self.get_variables()))
variables = FrozenOrderedDict((_decode_variable_name(k), v)
for k, v in self.get_variables().items())
attributes = FrozenOrderedDict(self.get_attrs())
return variables, attributes

Expand Down Expand Up @@ -252,3 +263,27 @@ def __getstate__(self):
def __setstate__(self, state):
self.__dict__.update(state)
self.ds = self._opener(mode=self._mode)

@contextlib.contextmanager
def ensure_open(self, autoclose):
"""
Helper function to make sure datasets are closed and opened
at appropriate times to avoid too many open file errors.
Use requires `autoclose=True` argument to `open_mfdataset`.
"""
if self._autoclose and not self._isopen:
try:
self.ds = self._opener()
self._isopen = True
yield
finally:
if autoclose:
self.close()
else:
yield

def assert_open(self):
if not self._isopen:
raise AssertionError('internal failure: file must be open '
'if `autoclose=True` is used.')
78 changes: 50 additions & 28 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@

from .. import Variable
from ..core import indexing
from ..core.utils import FrozenOrderedDict, close_on_error, Frozen
from ..core.utils import FrozenOrderedDict, close_on_error
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict

from .common import WritableCFDataStore, DataStorePickleMixin
from .common import WritableCFDataStore, DataStorePickleMixin, find_root
from .netCDF4_ import (_nc4_group, _nc4_values_and_dtype,
_extract_nc4_variable_encoding, BaseNetCDF4Array)


class H5NetCDFArrayWrapper(BaseNetCDF4Array):
def __getitem__(self, key):
with self.datastore.ensure_open(autoclose=True):
return self.get_array()[key]


def maybe_decode_bytes(txt):
if isinstance(txt, bytes_type):
return txt.decode('utf-8')
Expand Down Expand Up @@ -49,49 +55,63 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
"""Store for reading and writing data via h5netcdf
"""
def __init__(self, filename, mode='r', format=None, group=None,
writer=None):
writer=None, autoclose=False):
if format not in [None, 'NETCDF4']:
raise ValueError('invalid format for h5netcdf backend')
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
group=group)
self.ds = opener()
if autoclose:
raise NotImplemented('autoclose=True is not implemented '
'for the h5netcdf backend pending further '
'exploration, e.g., bug fixes (in h5netcdf?)')
self._autoclose = False
self._isopen = True
self.format = format
self._opener = opener
self._filename = filename
self._mode = mode
super(H5NetCDFStore, self).__init__(writer)

def open_store_variable(self, name, var):
dimensions = var.dimensions
data = indexing.LazilyIndexedArray(BaseNetCDF4Array(name, self))
attrs = _read_attributes(var)

# netCDF4 specific encoding
encoding = dict(var.filters())
chunking = var.chunking()
encoding['chunksizes'] = chunking if chunking != 'contiguous' else None

# save source so __repr__ can detect if it's local or not
encoding['source'] = self._filename
encoding['original_shape'] = var.shape
with self.ensure_open(autoclose=False):
dimensions = var.dimensions
data = indexing.LazilyIndexedArray(
H5NetCDFArrayWrapper(name, self))
attrs = _read_attributes(var)

# netCDF4 specific encoding
encoding = dict(var.filters())
chunking = var.chunking()
encoding['chunksizes'] = chunking \
if chunking != 'contiguous' else None

# save source so __repr__ can detect if it's local or not
encoding['source'] = self._filename
encoding['original_shape'] = var.shape

return Variable(dimensions, data, attrs, encoding)

def get_variables(self):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))
with self.ensure_open(autoclose=False):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))

def get_attrs(self):
return Frozen(_read_attributes(self.ds))
with self.ensure_open(autoclose=True):
return FrozenOrderedDict(_read_attributes(self.ds))

def get_dimensions(self):
return self.ds.dimensions
with self.ensure_open(autoclose=True):
return self.ds.dimensions

def set_dimension(self, name, length):
self.ds.createDimension(name, size=length)
with self.ensure_open(autoclose=False):
self.ds.createDimension(name, size=length)

def set_attribute(self, key, value):
self.ds.setncattr(key, value)
with self.ensure_open(autoclose=False):
self.ds.setncattr(key, value)

def prepare_variable(self, name, variable, check_encoding=False,
unlimited_dims=None):
Expand Down Expand Up @@ -129,12 +149,14 @@ def prepare_variable(self, name, variable, check_encoding=False,
return nc4_var, variable.data

def sync(self):
super(H5NetCDFStore, self).sync()
self.ds.sync()
with self.ensure_open(autoclose=True):
super(H5NetCDFStore, self).sync()
self.ds.sync()

def close(self):
ds = self.ds
# netCDF4 only allows closing the root group
while ds.parent is not None:
ds = ds.parent
ds.close()
if self._isopen:
# netCDF4 only allows closing the root group
ds = find_root(self.ds)
if not ds._closed:
ds.close()
self._isopen = False
Loading

0 comments on commit 371d034

Please sign in to comment.