Skip to content

Commit

Permalink
feat(datasets): Add NetCDFDataSet class (#360)
Browse files Browse the repository at this point in the history
* initialize template and early additions

Signed-off-by: Riley Brady <[email protected]>

* add placeholder for remote file system load

Signed-off-by: Riley Brady <[email protected]>

* switch to versioned dataset

Signed-off-by: Riley Brady <[email protected]>

* add initial remote -> local get for S3

Signed-off-by: Riley Brady <[email protected]>

* further generalize remote retrieval

Signed-off-by: Riley Brady <[email protected]>

* add in credentials

Signed-off-by: Riley Brady <[email protected]>

* make temppath optional for remote datasets

Signed-off-by: Riley Brady <[email protected]>

* add initial idea for multifile glob

Signed-off-by: Riley Brady <[email protected]>

* style: Introduce `ruff` for linting in all plugins. (#354)

Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* add suggested style changes

Signed-off-by: Riley Brady <[email protected]>

* add temppath to attributes

Signed-off-by: Riley Brady <[email protected]>

* more temppath fixes

Signed-off-by: Riley Brady <[email protected]>

* more temppath updates

Signed-off-by: Riley Brady <[email protected]>

* add better tempfile deletion and work on saving files

Signed-off-by: Riley Brady <[email protected]>

* make __del__ flexible

Signed-off-by: Riley Brady <[email protected]>

* formatting

Signed-off-by: Riley Brady <[email protected]>

* feat(datasets): create custom `DeprecationWarning` (#356)

* feat(datasets): create custom `DeprecationWarning`

Signed-off-by: Deepyaman Datta <[email protected]>

* feat(datasets): use the custom deprecation warning

Signed-off-by: Deepyaman Datta <[email protected]>

* chore(datasets): show Kedro's deprecation warnings

Signed-off-by: Deepyaman Datta <[email protected]>

* fix(datasets): remove unused imports in test files

Signed-off-by: Deepyaman Datta <[email protected]>

---------

Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* docs(datasets): add note about DataSet deprecation (#357)

Signed-off-by: Riley Brady <[email protected]>

* test(datasets): skip `tensorflow` tests on Windows (#363)

Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* ci: Pin `tables` version (#370)

* Pin tables version

Signed-off-by: Ankita Katiyar <[email protected]>

* Also fix kedro-airflow

Signed-off-by: Ankita Katiyar <[email protected]>

* Revert trying to fix airflow

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* build(datasets): Release `1.7.1` (#378)

Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* docs: Update CONTRIBUTING.md and add one for `kedro-datasets` (#379)

Update CONTRIBUTING.md + add one for kedro-datasets

Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* ci(datasets): Run tensorflow tests separately from other dataset tests (#377)

Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* feat: Kedro-Airflow convert all pipelines option (#335)

* feat: kedro airflow convert --all option

Signed-off-by: Simon Brugman <[email protected]>

* docs: release docs

Signed-off-by: Simon Brugman <[email protected]>

---------

Signed-off-by: Simon Brugman <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* docs(datasets): blacken code in rst literal blocks (#362)

Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* docs: cloudpickle is an interesting extension of the pickle functionality (#361)

Signed-off-by: H. Felix Wittmann <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* fix(datasets): Fix secret scan entropy error (#383)

Fix secret scan entropy error

Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* style: Rename mentions of `DataSet` to `Dataset` in `kedro-airflow` and `kedro-telemetry` (#384)


Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* feat(datasets): Migrated `PartitionedDataSet` and `IncrementalDataSet` from main repository to kedro-datasets (#253)

Signed-off-by: Peter Bludau <[email protected]>
Co-authored-by: Merel Theisen <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* fix: backwards compatibility for `kedro-airflow` (#381)

Signed-off-by: Simon Brugman <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* fix(datasets): Don't warn for SparkDataset on Databricks when using s3 (#341)

Signed-off-by: Alistair McKelvie <[email protected]>
Signed-off-by: Riley Brady <[email protected]>

* update docs API and release notes

Signed-off-by: Riley Brady <[email protected]>

* add netcdf requirements to setup

Signed-off-by: Riley Brady <[email protected]>

* lint

Signed-off-by: Riley Brady <[email protected]>

* add initial tests

Signed-off-by: Riley Brady <[email protected]>

* update dataset exists for multifile

Signed-off-by: Riley Brady <[email protected]>

* Add full test suite for NetCDFDataSet

Signed-off-by: Riley Brady <[email protected]>

* Add docstring examples

Signed-off-by: Riley Brady <[email protected]>

* change xarray version req

Signed-off-by: Riley Brady <[email protected]>

* update dask req

Signed-off-by: Riley Brady <[email protected]>

* rename DataSet -> Dataset

Signed-off-by: Riley Brady <[email protected]>

* Update xarray reqs for earlier python versions

Signed-off-by: Riley Brady <[email protected]>

* fix setup

Signed-off-by: Riley Brady <[email protected]>

* update test coverage

Signed-off-by: Riley Brady <[email protected]>

* exclude init from test coverage

Signed-off-by: Riley Brady <[email protected]>

* Sub in pathlib for os.remove

Signed-off-by: Riley Brady <[email protected]>

* add metadata to dataset

Signed-off-by: Riley Brady <[email protected]>

* add doctest for the new datasets

Signed-off-by: Nok <[email protected]>

* add patch for supporting http/https

Signed-off-by: Riley Brady <[email protected]>

* Small fixes post-merge

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Lint

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Fix import

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Un-ignore NetCDF doctest

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Add fixture

Signed-off-by: Ankita Katiyar <[email protected]>

* Mark problematic test as xfail

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Skip problematic test instead of making it fail

Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>

* Skip problematic tests and fix failing tests

Signed-off-by: Ankita Katiyar <[email protected]>

* Remove comment

Signed-off-by: Ankita Katiyar <[email protected]>

---------

Signed-off-by: Riley Brady <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Deepyaman Datta <[email protected]>
Signed-off-by: Ankita Katiyar <[email protected]>
Signed-off-by: Simon Brugman <[email protected]>
Signed-off-by: H. Felix Wittmann <[email protected]>
Signed-off-by: Peter Bludau <[email protected]>
Signed-off-by: Alistair McKelvie <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Nok Lam Chan <[email protected]>
Signed-off-by: Nok <[email protected]>
Signed-off-by: Juan Luis Cano Rodríguez <[email protected]>
Signed-off-by: Ankita Katiyar <[email protected]>
Co-authored-by: Merel Theisen <[email protected]>
Co-authored-by: Deepyaman Datta <[email protected]>
Co-authored-by: Ankita Katiyar <[email protected]>
Co-authored-by: Simon Brugman <[email protected]>
Co-authored-by: Felix Wittmann <[email protected]>
Co-authored-by: PtrBld <[email protected]>
Co-authored-by: Merel Theisen <[email protected]>
Co-authored-by: Alistair McKelvie <[email protected]>
Co-authored-by: Nok Lam Chan <[email protected]>
Co-authored-by: Juan Luis Cano Rodríguez <[email protected]>
Co-authored-by: Ankita Katiyar <[email protected]>
  • Loading branch information
12 people authored Feb 28, 2024
1 parent db92892 commit 16dfbdd
Show file tree
Hide file tree
Showing 8 changed files with 523 additions and 1 deletion.
4 changes: 4 additions & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Upcoming Release
## Major features and improvements
* Added `NetCDFDataset` for loading and saving `*.nc` files.

## Bug fixes and other changes
## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:
* [Riley Brady](https://github.com/riley-brady)


# Release 2.1.0
## Major features and improvements
Expand Down
1 change: 1 addition & 0 deletions kedro-datasets/docs/source/api/kedro_datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ kedro_datasets
kedro_datasets.json.JSONDataset
kedro_datasets.matlab.MatlabDataset
kedro_datasets.matplotlib.MatplotlibWriter
kedro_datasets.netcdf.NetCDFDataset
kedro_datasets.networkx.GMLDataset
kedro_datasets.networkx.GraphMLDataset
kedro_datasets.networkx.JSONDataset
Expand Down
14 changes: 14 additions & 0 deletions kedro-datasets/kedro_datasets/netcdf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""``NetCDFDataset`` is an ``AbstractDataset`` to save and load NetCDF files."""
from __future__ import annotations

from typing import Any

import lazy_loader as lazy

# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901
NetCDFDataset: type[NetCDFDataset]
NetCDFDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__, submod_attrs={"netcdf_dataset": ["NetCDFDataset"]}
)
218 changes: 218 additions & 0 deletions kedro-datasets/kedro_datasets/netcdf/netcdf_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""NetCDFDataset loads and saves data to a local netcdf (.nc) file."""
import logging
from copy import deepcopy
from glob import glob
from pathlib import Path, PurePosixPath
from typing import Any

import fsspec
import xarray as xr
from kedro.io.core import (
AbstractDataset,
DatasetError,
get_protocol_and_path,
)

logger = logging.getLogger(__name__)


class NetCDFDataset(AbstractDataset):
"""``NetCDFDataset`` loads/saves data from/to a NetCDF file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the NetCDF file.
Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog_yaml_examples.html>`_:
.. code-block:: yaml
single-file:
type: netcdf.NetCDFDataset
filepath: s3://bucket_name/path/to/folder/data.nc
save_args:
mode: a
load_args:
decode_times: False
multi-file:
type: netcdf.NetCDFDataset
filepath: s3://bucket_name/path/to/folder/data*.nc
load_args:
concat_dim: time
combine: nested
parallel: True
Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
advanced_data_catalog_usage.html>`_:
.. code-block:: pycon
>>> from kedro_datasets.netcdf import NetCDFDataset
>>> import xarray as xr
>>> ds = xr.DataArray(
... [0, 1, 2], dims=["x"], coords={"x": [0, 1, 2]}, name="data"
... ).to_dataset()
>>> dataset = NetCDFDataset(
... filepath="path/to/folder",
... save_args={"mode": "w"},
... )
>>> dataset.save(ds)
>>> reloaded = dataset.load()
"""

DEFAULT_LOAD_ARGS: dict[str, Any] = {}
DEFAULT_SAVE_ARGS: dict[str, Any] = {}

def __init__( # noqa
self,
*,
filepath: str,
temppath: str = None,
load_args: dict[str, Any] = None,
save_args: dict[str, Any] = None,
fs_args: dict[str, Any] = None,
credentials: dict[str, Any] = None,
metadata: dict[str, Any] = None,
):
"""Creates a new instance of ``NetCDFDataset`` pointing to a concrete NetCDF
file on a specific filesystem
Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a
protocol like `s3://`. If prefix is not provided, `file` protocol
(local filesystem) will be used. The prefix should be any protocol
supported by ``fsspec``. It can also be a path to a glob. If a
glob is provided then it can be used for reading multiple NetCDF
files.
temppath: Local temporary directory, used when reading from remote storage,
since NetCDF files cannot be directly read from remote storage.
load_args: Additional options for loading NetCDF file(s).
Here you can find all available arguments when reading single file:
https://xarray.pydata.org/en/stable/generated/xarray.open_dataset.html
Here you can find all available arguments when reading multiple files:
https://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html
All defaults are preserved.
save_args: Additional saving options for saving NetCDF file(s).
Here you can find all available arguments:
https://xarray.pydata.org/en/stable/generated/xarray.Dataset.to_netcdf.html
All defaults are preserved.
fs_args: Extra arguments to pass into underlying filesystem class
constructor (e.g. `{"cache_regions": "us-east-1"}` for
``s3fs.S3FileSystem``).
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
self._fs_args = deepcopy(fs_args) or {}
self._credentials = deepcopy(credentials) or {}
self._temppath = Path(temppath) if temppath is not None else None
protocol, path = get_protocol_and_path(filepath)
if protocol == "file":
self._fs_args.setdefault("auto_mkdir", True)
elif protocol != "file" and self._temppath is None:
raise ValueError(
"Need to set temppath in catalog if NetCDF file exists on remote "
+ "filesystem"
)
self._protocol = protocol
self._filepath = filepath

self._storage_options = {**self._credentials, **self._fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

self.metadata = metadata

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

# Determine if multiple NetCDF files are being loaded in.
self._is_multifile = (
True if "*" in str(PurePosixPath(self._filepath).stem) else False
)

def _load(self) -> xr.Dataset:
load_path = self._filepath

# If NetCDF(s) are on any type of remote storage, need to sync to local to open.
# Kerchunk could be implemented here in the future for direct remote reading.
if self._protocol != "file":
logger.info("Syncing remote NetCDF file to local storage.")

if self._is_multifile:
load_path = sorted(self._fs.glob(load_path))

self._fs.get(load_path, f"{self._temppath}/")
load_path = f"{self._temppath}/{self._filepath.stem}.nc"

if self._is_multifile:
data = xr.open_mfdataset(str(load_path), **self._load_args)
else:
data = xr.open_dataset(load_path, **self._load_args)

return data

def _save(self, data: xr.Dataset):
if self._is_multifile:
raise DatasetError(
"Globbed multifile datasets with '*' in filepath cannot be saved. "
+ "Create an alternate NetCDFDataset with a single .nc output file."
)
else:
save_path = self._filepath
bytes_buffer = data.to_netcdf(**self._save_args)

with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(bytes_buffer)

self._invalidate_cache()

def _describe(self) -> dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
)

def _exists(self) -> bool:
load_path = self._filepath

if self._is_multifile:
files = self._fs.glob(load_path)
exists = True if files else False
else:
exists = self._fs.exists(load_path)

return exists

def _invalidate_cache(self):
"""Invalidate underlying filesystem caches."""
self._fs.invalidate_cache(self._filepath)

def __del__(self):
"""Cleanup temporary directory"""
if self._temppath is not None:
logger.info("Deleting local temporary files.")
temp_filepath = self._temppath / PurePosixPath(self._filepath).stem
if self._is_multifile:
temp_files = glob(str(temp_filepath))
for file in temp_files:
try:
Path(file).unlink()
except FileNotFoundError: # pragma: no cover
pass # pragma: no cover
else:
temp_filepath = (
str(temp_filepath) + "/" + PurePosixPath(self._filepath).name
)
try:
Path(temp_filepath).unlink()
except FileNotFoundError:
pass
2 changes: 1 addition & 1 deletion kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ version = {attr = "kedro_datasets.__version__"}
fail_under = 100
show_missing = true
# temporarily ignore kedro_datasets/__init__.py in coverage report
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*", "kedro_datasets/tensorflow/*", "kedro_datasets/__init__.py", "kedro_datasets/conftest.py", "kedro_datasets/databricks/*"]
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/netcdf/*", "kedro_datasets/snowflake/*", "kedro_datasets/tensorflow/*", "kedro_datasets/__init__.py", "kedro_datasets/conftest.py", "kedro_datasets/databricks/*"]
exclude_lines = ["pragma: no cover", "raise NotImplementedError", "if TYPE_CHECKING:"]

[tool.pytest.ini_options]
Expand Down
10 changes: 10 additions & 0 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ def _collect_requirements(requires):
}
matplotlib_require = {"matplotlib.MatplotlibWriter": ["matplotlib>=3.0.3, <4.0"]}
matlab_require = {"matlab.MatlabDataset": ["scipy"]}
netcdf_require = {
"netcdf.NetCDFDataset": [
"h5netcdf>=1.2.0",
"netcdf4>=1.6.4",
"xarray<=0.20.2; python_version == '3.7'",
"xarray>=2023.1.0; python_version >= '3.8'",
]
}
networkx_require = {"networkx.NetworkXDataset": ["networkx~=2.4"]}
pandas_require = {
"pandas.CSVDataset": [PANDAS],
Expand Down Expand Up @@ -118,6 +126,7 @@ def _collect_requirements(requires):
"huggingface": _collect_requirements(huggingface_require),
"matlab": _collect_requirements(matlab_require),
"matplotlib": _collect_requirements(matplotlib_require),
"netcdf": _collect_requirements(netcdf_require),
"networkx": _collect_requirements(networkx_require),
"pandas": _collect_requirements(pandas_require),
"pickle": _collect_requirements(pickle_require),
Expand Down Expand Up @@ -235,6 +244,7 @@ def _collect_requirements(requires):
"tensorflow~=2.0; platform_system != 'Darwin' or platform_machine != 'arm64'",
"triad>=0.6.7, <1.0",
"trufflehog~=2.1",
"xarray>=2023.1.0",
"xlsxwriter~=1.0",
# huggingface
"datasets",
Expand Down
Empty file.
Loading

0 comments on commit 16dfbdd

Please sign in to comment.