Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes OS error arising from too many files open #1198

Merged
merged 1 commit into from
Mar 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Added new method :py:meth:`~Dataset.assign_attrs` to ``DataArray`` and
``dict.update`` method on attrs (:issue:`1281`).
By `Henry S. Harrison <https://hsharrison.github.io>`_.

- It is now possible to set the ``autoclose=True`` argument to
:py:func:`~xarray.open_mfdataset` to explicitly close opened files when not
in use to prevent occurrence of an OS Error related to too many open files.
Note, the default is ``autoclose=False``, which is consistent with previous
xarray behavior. By `Phillip J. Wolfram <https://github.com/pwolfram>`_.

Bug fixes
~~~~~~~~~
- ``rolling`` now keeps its original dimension order (:issue:`1125`).
Expand Down
66 changes: 38 additions & 28 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gzip
import os.path
from distutils.version import LooseVersion
from glob import glob
Expand Down Expand Up @@ -133,7 +132,7 @@ def _protect_dataset_variables_inplace(dataset, cache):


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
mask_and_scale=True, decode_times=True, autoclose=False,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, cache=None, drop_variables=None):
"""Load and decode a dataset from a file or file-like object.
Expand Down Expand Up @@ -163,6 +162,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
Expand Down Expand Up @@ -251,6 +254,12 @@ def maybe_decode_store(store, lock=False):
else:
ds2 = ds

# protect so that dataset store isn't necessarily closed, e.g.,
# streams like BytesIO can't be reopened
# datastore backend is responsible for determining this capability
if store._autoclose:
store.close()

return ds2

if isinstance(filename_or_obj, backends.AbstractDataStore):
Expand All @@ -271,33 +280,30 @@ def maybe_decode_store(store, lock=False):
if engine is not None and engine != 'scipy':
raise ValueError('can only read gzipped netCDF files with '
"default engine or engine='scipy'")
# if the string ends with .gz, then gunzip and open as netcdf file
try:
store = backends.ScipyDataStore(gzip.open(filename_or_obj))
except TypeError as e:
# TODO: gzipped loading only works with NetCDF3 files.
if 'is not a valid NetCDF 3 file' in e.message:
raise ValueError('gzipped file loading only supports '
'NetCDF 3 files.')
else:
raise
else:
if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore(filename_or_obj, group=group)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj)
elif engine == 'pydap':
store = backends.PydapDataStore(filename_or_obj)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)
engine = 'scipy'

if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore(filename_or_obj, group=group,
autoclose=autoclose)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj,
autoclose=autoclose)
elif engine == 'pydap':
store = backends.PydapDataStore(filename_or_obj)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group,
autoclose=autoclose)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj,
autoclose=autoclose)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)

if lock is None:
lock = _default_lock(filename_or_obj, engine)
with close_on_error(store):
Expand Down Expand Up @@ -479,6 +485,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
lock : False, True or threading.Lock, optional
This argument is passed on to :py:func:`dask.array.from_array`. By
default, a per-variable lock is used when reading data from netCDF
Expand Down
39 changes: 37 additions & 2 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time
import traceback
import contextlib
from collections import Mapping
from distutils.version import LooseVersion

Expand Down Expand Up @@ -41,6 +42,15 @@ def _decode_variable_name(name):
return name


def find_root(ds):
"""
Helper function to find the root of a netcdf or h5netcdf dataset.
"""
while ds.parent is not None:
ds = ds.parent
return ds


def robust_getitem(array, key, catch=Exception, max_retries=6,
initial_delay=500):
"""
Expand All @@ -67,6 +77,7 @@ def robust_getitem(array, key, catch=Exception, max_retries=6,


class AbstractDataStore(Mapping):
_autoclose = False

def __iter__(self):
return iter(self.variables)
Expand Down Expand Up @@ -107,8 +118,8 @@ def load(self):
This function will be called anytime variables or attributes
are requested, so care should be taken to make sure its fast.
"""
variables = FrozenOrderedDict((_decode_variable_name(k), v) for k, v in
iteritems(self.get_variables()))
variables = FrozenOrderedDict((_decode_variable_name(k), v)
for k, v in self.get_variables().items())
attributes = FrozenOrderedDict(self.get_attrs())
return variables, attributes

Expand Down Expand Up @@ -252,3 +263,27 @@ def __getstate__(self):
def __setstate__(self, state):
self.__dict__.update(state)
self.ds = self._opener(mode=self._mode)

@contextlib.contextmanager
def ensure_open(self, autoclose):
"""
Helper function to make sure datasets are closed and opened
at appropriate times to avoid too many open file errors.
Use requires `autoclose=True` argument to `open_mfdataset`.
"""
if self._autoclose and not self._isopen:
try:
self.ds = self._opener()
self._isopen = True
yield
finally:
if autoclose:
self.close()
else:
yield

def assert_open(self):
if not self._isopen:
raise AssertionError('internal failure: file must be open '
'if `autoclose=True` is used.')
78 changes: 50 additions & 28 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@

from .. import Variable
from ..core import indexing
from ..core.utils import FrozenOrderedDict, close_on_error, Frozen
from ..core.utils import FrozenOrderedDict, close_on_error
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict

from .common import WritableCFDataStore, DataStorePickleMixin
from .common import WritableCFDataStore, DataStorePickleMixin, find_root
from .netCDF4_ import (_nc4_group, _nc4_values_and_dtype,
_extract_nc4_variable_encoding, BaseNetCDF4Array)


class H5NetCDFArrayWrapper(BaseNetCDF4Array):
def __getitem__(self, key):
with self.datastore.ensure_open(autoclose=True):
return self.get_array()[key]


def maybe_decode_bytes(txt):
if isinstance(txt, bytes_type):
return txt.decode('utf-8')
Expand Down Expand Up @@ -49,49 +55,63 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
"""Store for reading and writing data via h5netcdf
"""
def __init__(self, filename, mode='r', format=None, group=None,
writer=None):
writer=None, autoclose=False):
if format not in [None, 'NETCDF4']:
raise ValueError('invalid format for h5netcdf backend')
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
group=group)
self.ds = opener()
if autoclose:
raise NotImplemented('autoclose=True is not implemented '
'for the h5netcdf backend pending further '
'exploration, e.g., bug fixes (in h5netcdf?)')
self._autoclose = False
self._isopen = True
self.format = format
self._opener = opener
self._filename = filename
self._mode = mode
super(H5NetCDFStore, self).__init__(writer)

def open_store_variable(self, name, var):
dimensions = var.dimensions
data = indexing.LazilyIndexedArray(BaseNetCDF4Array(name, self))
attrs = _read_attributes(var)

# netCDF4 specific encoding
encoding = dict(var.filters())
chunking = var.chunking()
encoding['chunksizes'] = chunking if chunking != 'contiguous' else None

# save source so __repr__ can detect if it's local or not
encoding['source'] = self._filename
encoding['original_shape'] = var.shape
with self.ensure_open(autoclose=False):
dimensions = var.dimensions
data = indexing.LazilyIndexedArray(
H5NetCDFArrayWrapper(name, self))
attrs = _read_attributes(var)

# netCDF4 specific encoding
encoding = dict(var.filters())
chunking = var.chunking()
encoding['chunksizes'] = chunking \
if chunking != 'contiguous' else None

# save source so __repr__ can detect if it's local or not
encoding['source'] = self._filename
encoding['original_shape'] = var.shape

return Variable(dimensions, data, attrs, encoding)

def get_variables(self):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))
with self.ensure_open(autoclose=False):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))

def get_attrs(self):
return Frozen(_read_attributes(self.ds))
with self.ensure_open(autoclose=True):
return FrozenOrderedDict(_read_attributes(self.ds))

def get_dimensions(self):
return self.ds.dimensions
with self.ensure_open(autoclose=True):
return self.ds.dimensions

def set_dimension(self, name, length):
self.ds.createDimension(name, size=length)
with self.ensure_open(autoclose=False):
self.ds.createDimension(name, size=length)

def set_attribute(self, key, value):
self.ds.setncattr(key, value)
with self.ensure_open(autoclose=False):
self.ds.setncattr(key, value)

def prepare_variable(self, name, variable, check_encoding=False,
unlimited_dims=None):
Expand Down Expand Up @@ -129,12 +149,14 @@ def prepare_variable(self, name, variable, check_encoding=False,
return nc4_var, variable.data

def sync(self):
super(H5NetCDFStore, self).sync()
self.ds.sync()
with self.ensure_open(autoclose=True):
super(H5NetCDFStore, self).sync()
self.ds.sync()

def close(self):
ds = self.ds
# netCDF4 only allows closing the root group
while ds.parent is not None:
ds = ds.parent
ds.close()
if self._isopen:
# netCDF4 only allows closing the root group
ds = find_root(self.ds)
if not ds._closed:
ds.close()
self._isopen = False
Loading