Skip to content

Commit

Permalink
Fixes open_mfdataset too many open file error
Browse files Browse the repository at this point in the history
Includes testing to demonstrate an OSError associated
with opening too many files as encountered
using open_mfdataset.

Fixed for the following backends:
 * netCDF4 backend
 * scipy backend
 * pynio backend

Open/close operations on h5netcdf appear to have an
error associated with the h5netcdf library following
correspondence with @shoyer.
Thus, there are still challenges with h5netcdf;
hence, support for h5netcdf is currently disabled.

Note, by default `autoclose=False` for open_mfdataset so standard
behavior is unchanged unless `autoclose=True`.

This choice of default is to select standard xarray performance over
general removal of the OSError associated with opening too many files as
encountered using open_mfdataset.
  • Loading branch information
pwolfram committed Mar 23, 2017
1 parent 93d6963 commit be6ab05
Show file tree
Hide file tree
Showing 11 changed files with 608 additions and 205 deletions.
6 changes: 6 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ v0.9.2 (unreleased)
Enhancements
~~~~~~~~~~~~

- It is now possible to set the ``autoclose=True`` argument to
:py:func:`~xarray.open_mfdataset` to explicitly close opened files when not
in use to prevent occurrence of an OS Error related to too many open files.
Note, the default is ``autoclose=False``, which is consistent with previous
xarray behavior. By `Phillip J. Wolfram <https://github.com/pwolfram>`_.

Bug fixes
~~~~~~~~~

Expand Down
66 changes: 38 additions & 28 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import gzip
import os.path
from distutils.version import StrictVersion
from glob import glob
Expand Down Expand Up @@ -133,7 +132,7 @@ def _protect_dataset_variables_inplace(dataset, cache):


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True,
mask_and_scale=True, decode_times=True, autoclose=False,
concat_characters=True, decode_coords=True, engine=None,
chunks=None, lock=None, cache=None, drop_variables=None):
"""Load and decode a dataset from a file or file-like object.
Expand Down Expand Up @@ -163,6 +162,10 @@ def open_dataset(filename_or_obj, group=None, decode_cf=True,
decode_times : bool, optional
If True, decode times encoded in the standard NetCDF datetime format
into datetime objects. Otherwise, leave them encoded as numbers.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
concat_characters : bool, optional
If True, concatenate along the last dimension of character arrays to
form string arrays. Dimensions will only be concatenated over (and
Expand Down Expand Up @@ -251,6 +254,12 @@ def maybe_decode_store(store, lock=False):
else:
ds2 = ds

# protect so that dataset store isn't necessarily closed, e.g.,
# streams like BytesIO can't be reopened
# datastore backend is responsible for determining this capability
if store._autoclose:
store.close()

return ds2

if isinstance(filename_or_obj, backends.AbstractDataStore):
Expand All @@ -271,33 +280,30 @@ def maybe_decode_store(store, lock=False):
if engine is not None and engine != 'scipy':
raise ValueError('can only read gzipped netCDF files with '
"default engine or engine='scipy'")
# if the string ends with .gz, then gunzip and open as netcdf file
try:
store = backends.ScipyDataStore(gzip.open(filename_or_obj))
except TypeError as e:
# TODO: gzipped loading only works with NetCDF3 files.
if 'is not a valid NetCDF 3 file' in e.message:
raise ValueError('gzipped file loading only supports '
'NetCDF 3 files.')
else:
raise
else:
if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore(filename_or_obj, group=group)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj)
elif engine == 'pydap':
store = backends.PydapDataStore(filename_or_obj)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)
engine = 'scipy'

if engine is None:
engine = _get_default_engine(filename_or_obj,
allow_remote=True)
if engine == 'netcdf4':
store = backends.NetCDF4DataStore(filename_or_obj, group=group,
autoclose=autoclose)
elif engine == 'scipy':
store = backends.ScipyDataStore(filename_or_obj,
autoclose=autoclose)
elif engine == 'pydap':
store = backends.PydapDataStore(filename_or_obj)
elif engine == 'h5netcdf':
store = backends.H5NetCDFStore(filename_or_obj, group=group,
autoclose=autoclose)
elif engine == 'pynio':
store = backends.NioDataStore(filename_or_obj,
autoclose=autoclose)
else:
raise ValueError('unrecognized engine for open_dataset: %r'
% engine)

if lock is None:
lock = _default_lock(filename_or_obj, engine)
with close_on_error(store):
Expand Down Expand Up @@ -479,6 +485,10 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
Engine to use when reading files. If not provided, the default engine
is chosen based on available dependencies, with a preference for
'netcdf4'.
autoclose : bool, optional
If True, automatically close files to avoid OS Error of too many files
being open. However, this option doesn't work with streams, e.g.,
BytesIO.
lock : False, True or threading.Lock, optional
This argument is passed on to :py:func:`dask.array.from_array`. By
default, a per-variable lock is used when reading data from netCDF
Expand Down
37 changes: 35 additions & 2 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time
import traceback
import contextlib
from collections import Mapping
from distutils.version import StrictVersion

Expand Down Expand Up @@ -40,6 +41,14 @@ def _decode_variable_name(name):
name = None
return name

def find_root(ds):
"""
Helper function to find the root of a netcdf or h5netcdf dataset.
"""
while ds.parent is not None:
ds = ds.parent
return ds


def robust_getitem(array, key, catch=Exception, max_retries=6,
initial_delay=500):
Expand Down Expand Up @@ -67,6 +76,7 @@ def robust_getitem(array, key, catch=Exception, max_retries=6,


class AbstractDataStore(Mapping):
_autoclose = False

def __iter__(self):
return iter(self.variables)
Expand Down Expand Up @@ -107,8 +117,8 @@ def load(self):
This function will be called anytime variables or attributes
are requested, so care should be taken to make sure its fast.
"""
variables = FrozenOrderedDict((_decode_variable_name(k), v) for k, v in
iteritems(self.get_variables()))
variables = FrozenOrderedDict((_decode_variable_name(k), v)
for k, v in self.get_variables().items())
attributes = FrozenOrderedDict(self.get_attrs())
return variables, attributes

Expand Down Expand Up @@ -252,3 +262,26 @@ def __getstate__(self):
def __setstate__(self, state):
self.__dict__.update(state)
self.ds = self._opener(mode=self._mode)

@contextlib.contextmanager
def ensure_open(self, autoclose):
"""
Helper function to make sure datasets are closed and opened
at appropriate times to avoid too many open file errors.
Use requires `autoclose=True` argument to `open_mfdataset`.
"""
if self._autoclose and not self._isopen:
try:
self.ds = self._opener()
self._isopen = True
yield
finally:
if autoclose:
self.close()
else:
yield

def assert_open(self):
if not self._isopen:
raise AssertionError('internal failure: file must be open if `autoclose=True` is used.')
74 changes: 49 additions & 25 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@
from __future__ import division
from __future__ import print_function
import functools
import operator
import warnings

import numpy as np

from .. import Variable
from ..core import indexing
from ..core.utils import FrozenOrderedDict, close_on_error, Frozen
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict

from .common import WritableCFDataStore, DataStorePickleMixin
from .common import WritableCFDataStore, DataStorePickleMixin, find_root
from .netCDF4_ import (_nc4_group, _nc4_values_and_dtype,
_extract_nc4_variable_encoding, BaseNetCDF4Array)


class H5NetCDFFArrayWrapper(BaseNetCDF4Array):
def __getitem__(self, key):
with self.datastore.ensure_open(autoclose=True):
return self.get_array()[key]


def maybe_decode_bytes(txt):
if isinstance(txt, bytes_type):
return txt.decode('utf-8')
Expand Down Expand Up @@ -49,49 +58,62 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
"""Store for reading and writing data via h5netcdf
"""
def __init__(self, filename, mode='r', format=None, group=None,
writer=None):
writer=None, autoclose=False):
if format not in [None, 'NETCDF4']:
raise ValueError('invalid format for h5netcdf backend')
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
group=group)
self.ds = opener()
if autoclose:
raise NotImplemented('autoclose=True is not implemented '
'for the h5netcdf backend pending further '
'exploration, e.g., bug fixes (in h5netcdf?)')
self._autoclose = False
self._isopen = True
self.format = format
self._opener = opener
self._filename = filename
self._mode = mode
super(H5NetCDFStore, self).__init__(writer)

def open_store_variable(self, name, var):
dimensions = var.dimensions
data = indexing.LazilyIndexedArray(BaseNetCDF4Array(name, self))
attrs = _read_attributes(var)
with self.ensure_open(autoclose=False):
dimensions = var.dimensions
data = indexing.LazilyIndexedArray(
H5NetCDFFArrayWrapper(name, self))
attrs = _read_attributes(var)

# netCDF4 specific encoding
encoding = dict(var.filters())
chunking = var.chunking()
encoding['chunksizes'] = chunking if chunking != 'contiguous' else None
# netCDF4 specific encoding
encoding = dict(var.filters())
chunking = var.chunking()
encoding['chunksizes'] = chunking if chunking != 'contiguous' else None

# save source so __repr__ can detect if it's local or not
encoding['source'] = self._filename
encoding['original_shape'] = var.shape
# save source so __repr__ can detect if it's local or not
encoding['source'] = self._filename
encoding['original_shape'] = var.shape

return Variable(dimensions, data, attrs, encoding)

def get_variables(self):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))
with self.ensure_open(autoclose=False):
return FrozenOrderedDict((k, self.open_store_variable(k, v))
for k, v in iteritems(self.ds.variables))

def get_attrs(self):
return Frozen(_read_attributes(self.ds))
with self.ensure_open(autoclose=True):
return FrozenOrderedDict(_read_attributes(self.ds))

def get_dimensions(self):
return self.ds.dimensions
with self.ensure_open(autoclose=True):
return self.ds.dimensions

def set_dimension(self, name, length):
self.ds.createDimension(name, size=length)
with self.ensure_open(autoclose=False):
self.ds.createDimension(name, size=length)

def set_attribute(self, key, value):
self.ds.setncattr(key, value)
with self.ensure_open(autoclose=False):
self.ds.setncattr(key, value)

def prepare_variable(self, name, variable, check_encoding=False,
unlimited_dims=None):
Expand Down Expand Up @@ -129,12 +151,14 @@ def prepare_variable(self, name, variable, check_encoding=False,
return nc4_var, variable.data

def sync(self):
super(H5NetCDFStore, self).sync()
self.ds.sync()
with self.ensure_open(autoclose=True):
super(H5NetCDFStore, self).sync()
self.ds.sync()

def close(self):
ds = self.ds
# netCDF4 only allows closing the root group
while ds.parent is not None:
ds = ds.parent
ds.close()
if self._isopen:
# netCDF4 only allows closing the root group
ds = find_root(self.ds)
if not ds._closed:
ds.close()
self._isopen = False
Loading

0 comments on commit be6ab05

Please sign in to comment.