From 4ec6f4c722dbc8d0395a198ffd783738475641f6 Mon Sep 17 00:00:00 2001 From: "Phillip J. Wolfram" Date: Wed, 11 Jan 2017 09:33:36 -0700 Subject: [PATCH] Fixes too many files open error: h5netcdf backend --- xarray/backends/h5netcdf_.py | 99 +++++++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 24 deletions(-) diff --git a/xarray/backends/h5netcdf_.py b/xarray/backends/h5netcdf_.py index acb46bee14c..88be4ab4ac9 100644 --- a/xarray/backends/h5netcdf_.py +++ b/xarray/backends/h5netcdf_.py @@ -2,18 +2,46 @@ from __future__ import division from __future__ import print_function import functools +import operator import warnings +import contextlib + +import numpy as np from .. import Variable from ..core import indexing from ..core.utils import FrozenOrderedDict, close_on_error, Frozen from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict -from .common import WritableCFDataStore, DataStorePickleMixin +from .common import WritableCFDataStore, DataStorePickleMixin, find_root from .netCDF4_ import (_nc4_group, _nc4_values_and_dtype, _extract_nc4_variable_encoding, BaseNetCDF4Array) +class H5NetcdFArrayWrapper(BaseNetCDF4Array): + def __getitem__(self, key): + with self.datastore.ensure_open(): + try: + data = operator.getitem(self.array, key) + except IndexError: + # Catch IndexError in netCDF4 and return a more informative error + # message. This is most often called when an unsorted indexer is + # used before the data is loaded from disk. + msg = ('The indexing operation you are attempting to perform is ' + 'not valid on h5netcdf.Variable object. Try loading your ' + 'data into memory first by calling .load().') + if not PY3: + import traceback + msg += '\n\nOriginal traceback:\n' + traceback.format_exc() + raise IndexError(msg) + + if self.ndim == 0: + # work around for netCDF4-python's broken handling of 0-d + # arrays (slicing them always returns a 1-dimensional array): + # https://github.com/Unidata/netcdf4-python/pull/220 + data = np.asscalar(data) + return data + def maybe_decode_bytes(txt): if isinstance(txt, bytes_type): return txt.decode('utf-8') @@ -37,6 +65,9 @@ def _read_attributes(h5netcdf_var): _extract_h5nc_encoding = functools.partial(_extract_nc4_variable_encoding, lsd_okay=False, backend='h5netcdf') +def _close_ds(ds): + # netCDF4 only allows closing the root group + find_root(ds).close() def _open_h5netcdf_group(filename, mode, group): import h5netcdf.legacyapi @@ -55,43 +86,63 @@ def __init__(self, filename, mode='r', format=None, group=None, opener = functools.partial(_open_h5netcdf_group, filename, mode=mode, group=group) self.ds = opener() + self._isopen = True self.format = format self._opener = opener self._filename = filename self._mode = mode super(H5NetCDFStore, self).__init__(writer) + @contextlib.contextmanager + def ensure_open(self, autoclose=True): + if not self._isopen: + try: + self.ds = self._opener() + self._isopen = True + yield + finally: + if autoclose: + self.close() + else: + yield + def open_store_variable(self, name, var): - dimensions = var.dimensions - data = indexing.LazilyIndexedArray(BaseNetCDF4Array(name, self)) - attrs = _read_attributes(var) + with self.ensure_open(): + dimensions = var.dimensions + data = indexing.LazilyIndexedArray(H5NetcdFArrayWrapper(name, self)) + attrs = _read_attributes(var) - # netCDF4 specific encoding - encoding = dict(var.filters()) - chunking = var.chunking() - encoding['chunksizes'] = chunking if chunking != 'contiguous' else None + # netCDF4 specific encoding + encoding = dict(var.filters()) + chunking = var.chunking() + encoding['chunksizes'] = chunking if chunking != 'contiguous' else None - # save source so __repr__ can detect if it's local or not - encoding['source'] = self._filename - encoding['original_shape'] = var.shape + # save source so __repr__ can detect if it's local or not + encoding['source'] = self._filename + encoding['original_shape'] = var.shape return Variable(dimensions, data, attrs, encoding) def get_variables(self): - return FrozenOrderedDict((k, self.open_store_variable(k, v)) - for k, v in iteritems(self.ds.variables)) + with self.ensure_open(): + return FrozenOrderedDict((k, self.open_store_variable(k, v)) + for k, v in iteritems(self.ds.variables)) def get_attrs(self): - return Frozen(_read_attributes(self.ds)) + with self.ensure_open(): + return FrozenOrderedDict(_read_attributes(self.ds)) def get_dimensions(self): - return self.ds.dimensions + with self.ensure_open(): + return self.ds.dimensions def set_dimension(self, name, length): - self.ds.createDimension(name, size=length) + with self.ensure_open(autoclose=False): + self.ds.createDimension(name, size=length) def set_attribute(self, key, value): - self.ds.setncattr(key, value) + with self.ensure_open(autoclose=False): + self.ds.setncattr(key, value) def prepare_variable(self, name, variable, check_encoding=False, unlimited_dims=None): @@ -126,15 +177,15 @@ def prepare_variable(self, name, variable, check_encoding=False, for k, v in iteritems(attrs): nc4_var.setncattr(k, v) + return nc4_var, variable.data def sync(self): - super(H5NetCDFStore, self).sync() - self.ds.sync() + with self.ensure_open(): + super(H5NetCDFStore, self).sync() + self.ds.sync() def close(self): - ds = self.ds - # netCDF4 only allows closing the root group - while ds.parent is not None: - ds = ds.parent - ds.close() + if self._isopen: + _close_ds(self.ds) + self._isopen = False