Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support netCDF load+save on dataset-like objects as well as filepaths. #5214

Merged
merged 10 commits into from
May 19, 2023
5 changes: 4 additions & 1 deletion docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ This document explains the changes made to Iris for this release
💼 Internal
===========

#. N/A
#. `@pp-mo`_ supported loading and saving netcdf :class:`netCDF4.Dataset` compatible
objects in place of file-paths, as hooks for a forthcoming
`"Xarray bridge" <https://github.com/SciTools/iris/issues/4994>`_ facility.
(:pull:`5214`)


.. comment
Expand Down
9 changes: 7 additions & 2 deletions lib/iris/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def callback(cube, field, filename):

"""

from collections.abc import Iterable
import contextlib
import glob
import importlib
import itertools
import os.path
import pathlib
import threading

import iris._constraints
Expand Down Expand Up @@ -256,7 +256,8 @@ def context(self, **kwargs):

def _generate_cubes(uris, callback, constraints):
"""Returns a generator of cubes given the URIs and a callback."""
if isinstance(uris, (str, pathlib.PurePath)):
if isinstance(uris, str) or not isinstance(uris, Iterable):
# Make a string, or other single item, into an iterable.
uris = [uris]

# Group collections of uris by their iris handler
Expand All @@ -273,6 +274,10 @@ def _generate_cubes(uris, callback, constraints):
urls = [":".join(x) for x in groups]
for cube in iris.io.load_http(urls, callback):
yield cube
elif scheme == "data":
data_objects = [x[1] for x in groups]
for cube in iris.io.load_data_objects(data_objects, callback):
yield cube
else:
raise ValueError("Iris cannot handle the URI scheme: %s" % scheme)

Expand Down
37 changes: 28 additions & 9 deletions lib/iris/fileformats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from iris.io.format_picker import (
DataSourceObjectProtocol,
FileExtension,
FormatAgent,
FormatSpecification,
Expand Down Expand Up @@ -125,16 +126,34 @@ def _load_grib(*args, **kwargs):
)


_nc_dap = FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF OPeNDAP",
UriProtocol(),
lambda protocol: protocol in ["http", "https"],
netcdf.load_cubes,
priority=6,
constraint_aware_handler=True,
)
)

# NetCDF file presented as an open, readable netCDF4 dataset (or mimic).
FORMAT_AGENT.add_spec(
FormatSpecification(
"NetCDF dataset",
DataSourceObjectProtocol(),
lambda object: all(
hasattr(object, x)
for x in ("variables", "dimensions", "groups", "ncattrs")
),
# Note: this uses the same call as the above "NetCDF_v4" (and "NetCDF OPeNDAP")
# The handler itself needs to detect what is passed + handle it appropriately.
netcdf.load_cubes,
priority=4,
constraint_aware_handler=True,
)
)
FORMAT_AGENT.add_spec(_nc_dap)
del _nc_dap


#
# UM Fieldsfiles.
Expand Down
24 changes: 16 additions & 8 deletions lib/iris/fileformats/cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,17 +1043,25 @@ class CFReader:
# TODO: remove once iris.experimental.ugrid.CFUGridReader is folded in.
CFGroup = CFGroup

def __init__(self, filename, warn=False, monotonic=False):
self._dataset = None
self._filename = os.path.expanduser(filename)
def __init__(self, file_source, warn=False, monotonic=False):
# Ensure safe operation for destructor, should init fail.
self._own_file = False
if isinstance(file_source, str):
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
# Create from filepath : open it + own it (=close when we die).
self._filename = os.path.expanduser(file_source)
self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)
self._own_file = True
else:
# We have been passed an open dataset.
# We use it but don't own it (don't close it).
self._dataset = file_source
self._filename = self._dataset.filepath()

#: Collection of CF-netCDF variables associated with this netCDF file
self.cf_group = self.CFGroup()

self._dataset = _thread_safe_nc.DatasetWrapper(
self._filename, mode="r"
)

# Issue load optimisation warning.
if warn and self._dataset.file_format in [
"NETCDF3_CLASSIC",
Expand Down Expand Up @@ -1311,7 +1319,7 @@ def _reset(self):

def _close(self):
# Explicitly close dataset to prevent file remaining open.
if self._dataset is not None:
if self._own_file and self._dataset is not None:
self._dataset.close()
self._dataset = None

Expand Down
30 changes: 28 additions & 2 deletions lib/iris/fileformats/netcdf/_thread_safe_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,42 @@ class _ThreadSafeWrapper(ABC):
the C-layer.
"""

# Note: this is only used to create a "contained" from passed args.
CONTAINED_CLASS = NotImplemented
# Note: this defines how we identify/check that a contained is of the expected type
# (in a duck-type way).
_DUCKTYPE_CHECK_PROPERTIES: typing.List[str] = [NotImplemented]

# Allows easy type checking, avoiding difficulties with isinstance and mocking.
THREAD_SAFE_FLAG = True

@classmethod
def is_contained_type(cls, instance):
return all(
hasattr(instance, attr) for attr in cls._DUCKTYPE_CHECK_PROPERTIES
)

@classmethod
def _from_existing(cls, instance):
trexfeathers marked this conversation as resolved.
Show resolved Hide resolved
"""Pass an existing instance to __init__, where it is contained."""
assert isinstance(instance, cls.CONTAINED_CLASS)
assert cls.is_contained_type(instance)
return cls(instance)

def __init__(self, *args, **kwargs):
"""Contain an existing instance, or generate a new one from arguments."""
if isinstance(args[0], self.CONTAINED_CLASS):
if len(args) > 0 and self.is_contained_type(args[0]):
trexfeathers marked this conversation as resolved.
Show resolved Hide resolved
# Passed a contained-type object : Wrap ourself around that.
instance = args[0]
# Explicitly ban "wrapping a wrapper".
if hasattr(instance, "THREAD_SAFE_FLAG"):
msg = (
"Cannot create {cls} containing an existing {cls}.".format(
cls=self.__class__.__name__
)
)
raise ValueError(msg)
trexfeathers marked this conversation as resolved.
Show resolved Hide resolved
else:
# Create a contained object of the intended type from passed args.
with _GLOBAL_NETCDF4_LOCK:
instance = self.CONTAINED_CLASS(*args, **kwargs)

Expand Down Expand Up @@ -89,6 +109,7 @@ class DimensionWrapper(_ThreadSafeWrapper):
"""

CONTAINED_CLASS = netCDF4.Dimension
_DUCKTYPE_CHECK_PROPERTIES = ["isunlimited"]


class VariableWrapper(_ThreadSafeWrapper):
Expand All @@ -99,6 +120,7 @@ class VariableWrapper(_ThreadSafeWrapper):
"""

CONTAINED_CLASS = netCDF4.Variable
_DUCKTYPE_CHECK_PROPERTIES = ["dimensions", "dtype"]

def setncattr(self, *args, **kwargs) -> None:
"""
Expand Down Expand Up @@ -147,6 +169,8 @@ class GroupWrapper(_ThreadSafeWrapper):
"""

CONTAINED_CLASS = netCDF4.Group
# Note: will also accept a whole Dataset object, but that is OK.
_DUCKTYPE_CHECK_PROPERTIES = ["createVariable"]

# All Group API that returns Dimension(s) is wrapped to instead return
# DimensionWrapper(s).
Expand Down Expand Up @@ -281,6 +305,8 @@ class DatasetWrapper(GroupWrapper):
"""

CONTAINED_CLASS = netCDF4.Dataset
# Note: 'close' exists on Dataset but not Group (though a rather weak distinction).
_DUCKTYPE_CHECK_PROPERTIES = ["createVariable", "close"]

@classmethod
def fromcdl(cls, *args, **kwargs):
Expand Down
20 changes: 11 additions & 9 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Also : `CF Conventions <https://cfconventions.org/>`_.

"""
from collections.abc import Iterable
import warnings

import numpy as np
Expand Down Expand Up @@ -483,14 +484,15 @@ def inner(cf_datavar):
return result


def load_cubes(filenames, callback=None, constraints=None):
def load_cubes(file_sources, callback=None, constraints=None):
"""
Loads cubes from a list of NetCDF filenames/OPeNDAP URLs.

Args:

* filenames (string/list):
* file_sources (string/list):
One or more NetCDF filenames/OPeNDAP URLs to load from.
OR open datasets.

Kwargs:

Expand Down Expand Up @@ -518,18 +520,18 @@ def load_cubes(filenames, callback=None, constraints=None):
# Create an actions engine.
engine = _actions_engine()

if isinstance(filenames, str):
filenames = [filenames]
if isinstance(file_sources, str) or not isinstance(file_sources, Iterable):
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
file_sources = [file_sources]

for filename in filenames:
# Ingest the netCDF file.
for file_source in file_sources:
# Ingest the file. At present may be a filepath or an open netCDF4.Dataset.
meshes = {}
if PARSE_UGRID_ON_LOAD:
cf_reader_class = CFUGridReader
else:
cf_reader_class = iris.fileformats.cf.CFReader

with cf_reader_class(filename) as cf:
with cf_reader_class(file_source) as cf:
if PARSE_UGRID_ON_LOAD:
meshes = _meshes_from_cf(cf)

Expand Down Expand Up @@ -563,7 +565,7 @@ def load_cubes(filenames, callback=None, constraints=None):
if mesh is not None:
mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var)

cube = _load_cube(engine, cf, cf_var, filename)
cube = _load_cube(engine, cf, cf_var, cf.filename)

# Attach the mesh (if present) to the cube.
for mesh_coord in mesh_coords:
Expand All @@ -577,7 +579,7 @@ def load_cubes(filenames, callback=None, constraints=None):
warnings.warn("{}".format(e))

# Perform any user registered callback function.
cube = run_callback(callback, cube, cf_var, filename)
cube = run_callback(callback, cube, cf_var, file_source)

# Callback mechanism may return None, which must not be yielded
if cube is None:
Expand Down
Loading