diff --git a/lib/iris/__init__.py b/lib/iris/__init__.py index 38465472ee3..0e6670533fa 100644 --- a/lib/iris/__init__.py +++ b/lib/iris/__init__.py @@ -89,12 +89,12 @@ def callback(cube, field, filename): """ +from collections.abc import Iterable import contextlib import glob import importlib import itertools import os.path -import pathlib import threading import iris._constraints @@ -256,7 +256,8 @@ def context(self, **kwargs): def _generate_cubes(uris, callback, constraints): """Returns a generator of cubes given the URIs and a callback.""" - if isinstance(uris, (str, pathlib.PurePath)): + if isinstance(uris, str) or not isinstance(uris, Iterable): + # Make a string, or other single item, into an iterable. uris = [uris] # Group collections of uris by their iris handler @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints): urls = [":".join(x) for x in groups] for cube in iris.io.load_http(urls, callback): yield cube + elif scheme == "data": + data_objects = [x[1] for x in groups] + for cube in iris.io.load_data_objects(data_objects, callback): + yield cube else: raise ValueError("Iris cannot handle the URI scheme: %s" % scheme) diff --git a/lib/iris/fileformats/__init__.py b/lib/iris/fileformats/__init__.py index 96a848deb04..ceafa5b97e7 100644 --- a/lib/iris/fileformats/__init__.py +++ b/lib/iris/fileformats/__init__.py @@ -9,6 +9,7 @@ """ from iris.io.format_picker import ( + DataSourceObjectProtocol, FileExtension, FormatAgent, FormatSpecification, @@ -125,16 +126,32 @@ def _load_grib(*args, **kwargs): ) -_nc_dap = FormatSpecification( - "NetCDF OPeNDAP", - UriProtocol(), - lambda protocol: protocol in ["http", "https"], - netcdf.load_cubes, - priority=6, - constraint_aware_handler=True, +FORMAT_AGENT.add_spec( + FormatSpecification( + "NetCDF OPeNDAP", + UriProtocol(), + lambda protocol: protocol in ["http", "https"], + netcdf.load_cubes, + priority=6, + constraint_aware_handler=True, + ) ) -FORMAT_AGENT.add_spec(_nc_dap) -del _nc_dap + +# NetCDF file presented as an open, readable netCDF4 dataset (or mimic). +FORMAT_AGENT.add_spec( + FormatSpecification( + "NetCDF dataset", + DataSourceObjectProtocol(), + lambda object: all( + hasattr(object, x) + for x in ("variables", "dimensions", "groups", "ncattrs") + ), + netcdf.load_cubes, # using the same call : it must distinguish. + priority=4, + constraint_aware_handler=True, + ) +) + # # UM Fieldsfiles. diff --git a/lib/iris/fileformats/cf.py b/lib/iris/fileformats/cf.py index 0b058abba69..2ed01846bd4 100644 --- a/lib/iris/fileformats/cf.py +++ b/lib/iris/fileformats/cf.py @@ -1043,17 +1043,25 @@ class CFReader: # TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in. CFGroup = CFGroup - def __init__(self, filename, warn=False, monotonic=False): - self._dataset = None - self._filename = os.path.expanduser(filename) + def __init__(self, file_source, warn=False, monotonic=False): + # Ensure safe operation for destructor, should init fail. + self._own_file = False + if isinstance(file_source, str): + # Create from filepath : open it + own it (=close when we die). + self._filename = os.path.expanduser(file_source) + self._dataset = _thread_safe_nc.DatasetWrapper( + self._filename, mode="r" + ) + self._own_file = True + else: + # We have been passed an open dataset. + # We use it but don't own it (don't close it). + self._dataset = file_source + self._filename = self._dataset.filepath() #: Collection of CF-netCDF variables associated with this netCDF file self.cf_group = self.CFGroup() - self._dataset = _thread_safe_nc.DatasetWrapper( - self._filename, mode="r" - ) - # Issue load optimisation warning. if warn and self._dataset.file_format in [ "NETCDF3_CLASSIC", @@ -1311,7 +1319,7 @@ def _reset(self): def _close(self): # Explicitly close dataset to prevent file remaining open. - if self._dataset is not None: + if self._own_file and self._dataset is not None: self._dataset.close() self._dataset = None diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 113f40b3c92..91b5ea65cbb 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -13,6 +13,7 @@ Also : `CF Conventions `_. """ +from collections.abc import Iterable import warnings import numpy as np @@ -483,14 +484,15 @@ def inner(cf_datavar): return result -def load_cubes(filenames, callback=None, constraints=None): +def load_cubes(file_sources, callback=None, constraints=None): """ - Loads cubes from a list of NetCDF filenames/OPeNDAP URLs. + Loads cubes from a list of NetCDF file_sources/OPeNDAP URLs. Args: - * filenames (string/list): - One or more NetCDF filenames/OPeNDAP URLs to load from. + * file_sources (string/list): + One or more NetCDF file_sources/OPeNDAP URLs to load from. + OR open datasets. Kwargs: @@ -518,18 +520,18 @@ def load_cubes(filenames, callback=None, constraints=None): # Create an actions engine. engine = _actions_engine() - if isinstance(filenames, str): - filenames = [filenames] + if isinstance(file_sources, str) or not isinstance(file_sources, Iterable): + file_sources = [file_sources] - for filename in filenames: - # Ingest the netCDF file. + for file_source in file_sources: + # Ingest the file. At present may be a filepath or an open netCDF4.Dataset. meshes = {} if PARSE_UGRID_ON_LOAD: cf_reader_class = CFUGridReader else: cf_reader_class = iris.fileformats.cf.CFReader - with cf_reader_class(filename) as cf: + with cf_reader_class(file_source) as cf: if PARSE_UGRID_ON_LOAD: meshes = _meshes_from_cf(cf) @@ -563,7 +565,7 @@ def load_cubes(filenames, callback=None, constraints=None): if mesh is not None: mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) - cube = _load_cube(engine, cf, cf_var, filename) + cube = _load_cube(engine, cf, cf_var, cf.filename) # Attach the mesh (if present) to the cube. for mesh_coord in mesh_coords: @@ -577,7 +579,7 @@ def load_cubes(filenames, callback=None, constraints=None): warnings.warn("{}".format(e)) # Perform any user registered callback function. - cube = run_callback(callback, cube, cf_var, filename) + cube = run_callback(callback, cube, cf_var, file_source) # Callback mechanism may return None, which must not be yielded if cube is None: diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 5c11d804db7..ebc4f991567 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -374,6 +374,7 @@ def __init__(self, filename, netcdf_format, compute=True): ---------- filename : string Name of the netCDF file to save the cube. + OR a writeable object supporting the netCF4.Dataset api. netcdf_format : string Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC', @@ -427,7 +428,7 @@ def __init__(self, filename, netcdf_format, compute=True): #: A dictionary, mapping formula terms to owner cf variable name self._formula_terms_cache = {} #: Target filepath - self.filepath = os.path.abspath(filename) + self.filepath = None #: A list of delayed writes for lazy saving self._delayed_writes = ( [] @@ -438,32 +439,44 @@ def __init__(self, filename, netcdf_format, compute=True): #: A per-file write lock to prevent dask attempting overlapping writes. self.file_write_lock = _dask_locks.get_worker_lock(self.filepath) #: NetCDF dataset - self._dataset = None - try: - self._dataset = _thread_safe_nc.DatasetWrapper( - self.filepath, mode="w", format=netcdf_format - ) - except RuntimeError: - dir_name = os.path.dirname(self.filepath) - if not os.path.isdir(dir_name): - msg = "No such file or directory: {}".format(dir_name) - raise IOError(msg) - if not os.access(dir_name, os.R_OK | os.W_OK): - msg = "Permission denied: {}".format(self.filepath) - raise IOError(msg) - else: - raise + self._dataset = None # this line just for the API page + + # Detect if we were passed a pre-opened dataset (or something like one) + self._to_open_dataset = hasattr(filename, "createVariable") + if self._to_open_dataset: + # Given a dataset : derive instance filepath from the dataset + self._dataset = filename + self.filepath = self._dataset.filepath() + else: + # Given a filepath string/path : create a dataset there + try: + self.filepath = os.path.abspath(filename) + self._dataset = _thread_safe_nc.DatasetWrapper( + self.filepath, mode="w", format=netcdf_format + ) + except RuntimeError: + dir_name = os.path.dirname(self.filepath) + if not os.path.isdir(dir_name): + msg = "No such file or directory: {}".format(dir_name) + raise IOError(msg) + if not os.access(dir_name, os.R_OK | os.W_OK): + msg = "Permission denied: {}".format(self.filepath) + raise IOError(msg) + else: + raise def __enter__(self): return self def __exit__(self, type, value, traceback): """Flush any buffered data to the CF-netCDF file before closing.""" - self._dataset.sync() - self._dataset.close() - if self.compute: - self.complete() + if not self._to_open_dataset: + # Only close if the Saver created it. + self._dataset.close() + # Complete after closing, if required + if self.compute: + self.complete() def write( self, diff --git a/lib/iris/io/__init__.py b/lib/iris/io/__init__.py index 7680d9bac60..f9f1c0ed618 100644 --- a/lib/iris/io/__init__.py +++ b/lib/iris/io/__init__.py @@ -94,6 +94,8 @@ def decode_uri(uri, default="file"): In addition to well-formed URIs, it also supports bare file paths as strings or :class:`pathlib.PurePath`. Both Windows and UNIX style paths are accepted. + It also supports 'bare objects', i.e. anything which is not a string. + These are identified with a scheme of 'data', and returned unchanged. .. testsetup:: @@ -119,20 +121,31 @@ def decode_uri(uri, default="file"): >>> print(decode_uri('dataZoo/...')) ('file', 'dataZoo/...') + >>> print(decode_uri({})) + ('data', {}) + """ if isinstance(uri, pathlib.PurePath): uri = str(uri) - # make sure scheme has at least 2 letters to avoid windows drives - # put - last in the brackets so it refers to the character, not a range - # reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1 - match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri) - if match: - scheme = match.group(1) - part = match.group(2) + + if isinstance(uri, str): + # make sure scheme has at least 2 letters to avoid windows drives + # put - last in the brackets so it refers to the character, not a range + # reference on valid schemes: http://tools.ietf.org/html/std66#section-3.1 + match = re.match(r"^([a-zA-Z][a-zA-Z0-9+.-]+):(.+)", uri) + if match: + scheme = match.group(1) + part = match.group(2) + else: + # Catch bare UNIX and Windows paths + scheme = default + part = uri else: - # Catch bare UNIX and Windows paths - scheme = default + # We can pass things other than strings, like open files. + # These are simply identified as 'data objects'. + scheme = "data" part = uri + return scheme, part @@ -255,6 +268,25 @@ def load_http(urls, callback): yield cube +def load_data_objects(urls, callback): + """ + Takes a list of data-source objects and a callback function, and returns a + generator of Cubes. + The 'objects' take the place of 'uris' in the load calls. + The appropriate types of the data-source objects are expected to be + recognised by the handlers : This is done in the usual way by passing the + context to the format picker to get a handler for each. + + .. note:: + + Typically, this function should not be called directly; instead, the + intended interface for loading is :func:`iris.load`. + + """ + # NOTE: this operation is currently *identical* to the http one. + yield from load_http(urls, callback) + + def _dot_save(cube, target): # A simple wrapper for `iris.fileformats.dot.save` which allows the # saver to be registered without triggering the import of diff --git a/lib/iris/io/format_picker.py b/lib/iris/io/format_picker.py index a8e333c5662..9def0ada986 100644 --- a/lib/iris/io/format_picker.py +++ b/lib/iris/io/format_picker.py @@ -331,3 +331,22 @@ def get_element(self, basename, file_handle): from iris.io import decode_uri return decode_uri(basename)[0] + + +class DataSourceObjectProtocol(FileElement): + """ + A :class:`FileElement` that simply returns the URI entry itself. + + This enables a arbitrary non-string data object to be passed, subject to + subsequent checks on the object itself (specified in the handler). + + """ + + def __init__(self): + super().__init__(requires_fh=False) + + def get_element(self, basename, file_handle): + # In this context, there should *not* be a file opened by the handler. + # Just return 'basename', which in this case is not a name, or even a + # string, but a passed 'data object'. + return basename diff --git a/lib/iris/tests/integration/netcdf/test_general.py b/lib/iris/tests/integration/netcdf/test_general.py index 339a38fd1f7..02bbba55469 100644 --- a/lib/iris/tests/integration/netcdf/test_general.py +++ b/lib/iris/tests/integration/netcdf/test_general.py @@ -26,6 +26,7 @@ from iris.cube import Cube, CubeList import iris.exceptions from iris.fileformats.netcdf import Saver, UnknownCellMethodWarning +import iris.fileformats.netcdf._thread_safe_nc as threadsafe_nc from iris.tests.stock.netcdf import ncgen_from_cdl @@ -361,5 +362,74 @@ def test_lat_not_loaded(self): _ = cube.coord("lat") +@tests.skip_data +class TestDatasetLoad(tests.IrisTest): + def test_basic_load(self): + # test loading from an open Dataset, in place of a filepath spec. + filepath = tests.get_data_path( + ["NetCDF", "global", "xyz_t", "GEMS_CO2_Apr2006.nc"] + ) + phenom_id = "Carbon Dioxide" + expected = iris.load_cube(filepath, phenom_id) + ds = None + try: + ds = threadsafe_nc.DatasetWrapper(filepath) + result = iris.load_cube(ds, phenom_id) + finally: + if ds is not None: + ds.close() + + self.assertEqual(expected, result) + + +@tests.skip_data +class TestDatasetSave(tests.IrisTest): + @classmethod + def setUpClass(cls): + # Create a temp directory for transient test files. + cls.temp_dir = tempfile.mkdtemp() + + @classmethod + def tearDownClass(cls): + # Destroy the temp directory. + shutil.rmtree(cls.temp_dir) + + def test_basic_save(self): + # test saving to a Dataset, in place of a filepath spec. + + # load some test data (--> 2 cubes) + filepath = tests.get_data_path( + ["NetCDF", "global", "xyz_t", "GEMS_CO2_Apr2006.nc"] + ) + testdata = iris.load(filepath) + + # Give the cubes a definite order, since this is not stable ! + testdata = sorted(testdata, key=lambda cube: cube.name()) + + # Save to netcdf file in the usual way. + filepath_direct = f"{self.temp_dir}/tmp_direct.nc" + iris.save(testdata, filepath_direct) + # Check against test-specific CDL result file. + self.assertCDL(filepath_direct) + + # Save indirectly via netcdf dataset. + filepath_indirect = f"{self.temp_dir}/tmp_indirect.nc" + nc_dataset = threadsafe_nc.DatasetWrapper(filepath_indirect, "w") + iris.save(testdata, nc_dataset, saver="nc") + # Do some very basic sanity checks on the Dataset object. + self.assertEqual( + ["time", "levelist", "latitude", "longitude"], + list(nc_dataset.dimensions), + ) + self.assertEqual( + ["co2", "time", "levelist", "latitude", "longitude", "lnsp"], + list(nc_dataset.variables), + ) + # Save to file. + nc_dataset.close() + # Check the saved file against the same CDL as the 'normal' save. + self.assertCDL(filepath_indirect) + + if __name__ == "__main__": tests.main() diff --git a/lib/iris/tests/results/file_load/known_loaders.txt b/lib/iris/tests/results/file_load/known_loaders.txt index 9b0a0745745..98ac3e4a077 100644 --- a/lib/iris/tests/results/file_load/known_loaders.txt +++ b/lib/iris/tests/results/file_load/known_loaders.txt @@ -4,6 +4,7 @@ * NetCDF 64 bit offset format (priority 5) * NetCDF_v4 (priority 5) * UM Post Processing file (PP) (priority 5) + * NetCDF dataset (priority 4) * UM Fieldsfile (FF) post v5.2 (priority 4) * ABF (priority 3) * ABL (priority 3) diff --git a/lib/iris/tests/results/integration/netcdf/general/TestDatasetSave/basic_save.cdl b/lib/iris/tests/results/integration/netcdf/general/TestDatasetSave/basic_save.cdl new file mode 100644 index 00000000000..133c886d87b --- /dev/null +++ b/lib/iris/tests/results/integration/netcdf/general/TestDatasetSave/basic_save.cdl @@ -0,0 +1,34 @@ +dimensions: + latitude = 181 ; + levelist = 60 ; + longitude = 360 ; + time = 1 ; +variables: + double co2(time, levelist, latitude, longitude) ; + co2:long_name = "Carbon Dioxide" ; + co2:units = "kg kg**-1" ; + int time(time) ; + time:axis = "T" ; + time:units = "hours since 1900-01-01 00:00:0.0" ; + time:standard_name = "time" ; + time:long_name = "time" ; + time:calendar = "standard" ; + int levelist(levelist) ; + levelist:long_name = "model_level_number" ; + float latitude(latitude) ; + latitude:axis = "Y" ; + latitude:units = "degrees_north" ; + latitude:standard_name = "latitude" ; + latitude:long_name = "latitude" ; + float longitude(longitude) ; + longitude:axis = "X" ; + longitude:units = "degrees_east" ; + longitude:standard_name = "longitude" ; + longitude:long_name = "longitude" ; + double lnsp(time, levelist, latitude, longitude) ; + lnsp:long_name = "Logarithm of surface pressure" ; + +// global attributes: + :history = "2009-08-25 13:46:31 GMT by mars2netcdf-0.92" ; + :Conventions = "CF-1.7" ; +}