Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

open_virtual_dataset with and without indexes #52

Merged
merged 16 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies:
- kerchunk
- netcdf4
- pydantic
- pooch
- pytest
- pytest-cov
- ruff
Expand Down
81 changes: 76 additions & 5 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ ds1 = ds.isel(time=slice(None, 1460))
ds2 = ds.isel(time=slice(1460, None))

ds1.to_netcdf('air1.nc')
ds1.to_netcdf('air2.nc')
ds2.to_netcdf('air2.nc')
```

Note that we have created these in such a way that each dataset has one equally-sized chunk.
Expand All @@ -195,6 +195,16 @@ vds1 = open_virtual_dataset('air1.nc', indexes={})
vds2 = open_virtual_dataset('air2.nc', indexes={})
```

We can see that the datasets have no indexes.

```python
vds1.indexes
```
```
Indexes:
*empty*
```

```{note}
Passing `indexes={}` will only work if you use a [specific branch of xarray](https://github.com/TomNicholas/xarray/tree/concat-no-indexes), as it requires multiple in-progress PR's, see [GH issue #14](https://github.com/TomNicholas/VirtualiZarr/issues/14#issuecomment-2018369470).
```
Expand Down Expand Up @@ -249,20 +259,79 @@ In future we would like for it to be possible to just use `xr.open_mfdataset` to

vds = xr.open_mfdataset(
['air1.nc', 'air2.nc'],
combine='nested,
combine='nested',
concat_dim=['time'],
coords='minimal',
compat='override',
indexes={},
)

but this requires some [upstream changes](https://github.com/TomNicholas/VirtualiZarr/issues/35) in xarray.
```

### Automatic ordering using coordinate data

TODO: How to concatenate with order inferred from indexes automatically
Sometimes we don't have a priori knowledge of which files contain what content, and we would like to concatenate them in an order dictated by their coordinates (e.g. so that a `time` coordinate monotonically increases into the future).

For this we will actually want to create xarray indexes, so that we can use the values in them to determine the correct concatenation order. This requires loading coordinate values into memory, the same way that `xarray.open_dataset` does by default.

To open a virtual dataset but with in-memory indexes along 1D [dimension coordinates](), pass `indexes=None` to `open_virtual_dataset` (which is the default).

```python
vds1 = open_virtual_dataset('air1.nc')
vds2 = open_virtual_dataset('air2.nc')
```

Now we can see that some indexes have been created by default.

```python
vds1.xindexes
```
```
Indexes:
lat PandasIndex
lon PandasIndex
time PandasIndex
```

To use these indexes to infer concatenation order we can use `xarray.combine_by_coords`.

```python
combined_vds = xr.combine_by_coords([vds2, vds1])
combined_vds
```
```
<xarray.Dataset> Size: 8MB
Dimensions: (time: 2920, lat: 25, lon: 53)
Coordinates:
* lat (lat) float32 100B 75.0 72.5 70.0 67.5 65.0 ... 22.5 20.0 17.5 15.0
* lon (lon) float32 212B 200.0 202.5 205.0 207.5 ... 325.0 327.5 330.0
* time (time) datetime64[ns] 23kB 2013-01-01 ... 2014-12-31T18:00:00
Data variables:
air (time, lat, lon) int16 8MB ManifestArray<shape=(2920, 25, 53), d...
Attributes:
Conventions: COARDS
description: Data is from NMC initialized reanalysis\n(4x/day). These a...
platform: Model
references: http://www.esrl.noaa.gov/psd/data/gridded/data.ncep.reanaly...
title: 4x daily NMC reanalysis (1948)
```
We can see that despite the fact we passed the datasets out of order, the time coordinate in the result is still ordered correctly.

Note that we can safely omit the `compat='override'` kwarg now, because we have indexes whose values will be compared.

TODO: Improve xarray's error message for if we tried to use `combine_by_coords` without creating indexes first.

TODO: Note on how this could be done using `open_mfdataset(..., combine='by_coords')` in future
```{note}
In future we would like for it to be possible to just use `xr.open_mfdataset` to open the files and combine them in one go, e.g.

vds = xr.open_mfdataset(
['air2.nc', 'air1.nc'],
combine='by_coords',
)

but this requires some [upstream changes](https://github.com/TomNicholas/VirtualiZarr/issues/35) in xarray.
```

### Automatic ordering using metadata

Expand All @@ -282,7 +351,7 @@ To write out all the references in the virtual dataset as a single kerchunk-comp
combined_vds.virtualize.to_kerchunk('combined.json', format='json')
```

These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using its built-in kerchunk xarray backend.
These references can now be interpreted like they were a Zarr store by [fsspec](https://github.com/fsspec/filesystem_spec), using kerchunk's built-in xarray backend (so you need kerchunk to be installed to use `engine='kerchunk'`).

```python
import fsspec
Expand All @@ -295,4 +364,6 @@ combined_ds = xr.open_dataset(mapper, engine="kerchunk")

### Writing as Zarr

TODO: Write out references as a Zarr v3 store following the [Chunk Manifest ZEP](https://github.com/zarr-developers/zarr-specs/issues/287), see [PR #45](https://github.com/TomNicholas/VirtualiZarr/pull/45)

TODO: Explanation of how this requires changes in zarr upstream to be able to read it
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ test = [
"pre-commit",
"pytest-mypy",
"pytest",
"scipy"
"scipy",
"pooch",
]


Expand Down
81 changes: 81 additions & 0 deletions virtualizarr/tests/test_xarray.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from typing import Mapping

import numpy as np
import pytest
import xarray as xr
from xarray.core.indexes import Index

from virtualizarr import open_virtual_dataset
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.zarr import ZArray

Expand Down Expand Up @@ -104,6 +109,7 @@
ds2 = xr.Dataset({"a": (["x", "y"], marr2)})

result = xr.concat([ds1, ds2], dim="x")["a"]
assert result.indexes == {}

assert result.shape == (2, 20)
assert result.chunks == (1, 10)
Expand Down Expand Up @@ -150,6 +156,7 @@
ds2 = xr.Dataset({"a": (["x", "y"], marr2)})

result = xr.concat([ds1, ds2], dim="z")["a"]
assert result.indexes == {}

# xarray.concat adds new dimensions along axis=0
assert result.shape == (2, 5, 20)
Expand Down Expand Up @@ -201,6 +208,7 @@
ds2 = xr.Dataset(coords=coords)

result = xr.concat([ds1, ds2], dim="t")["t"]
assert result.indexes == {}

assert result.shape == (40,)
assert result.chunks == (10,)
Expand All @@ -215,3 +223,76 @@
assert result.data.zarray.fill_value == zarray.fill_value
assert result.data.zarray.order == zarray.order
assert result.data.zarray.zarr_format == zarray.zarr_format


@pytest.fixture
def netcdf4_file(tmpdir):
# Set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature")

# Save it to disk as netCDF (in temporary directory)
filepath = f"{tmpdir}/air.nc"
ds.to_netcdf(filepath)

return filepath


@pytest.fixture
def netcdf4_files(tmpdir):
# Set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature")

# split inrto equal chunks so we can concatenate them back together later
ds1 = ds.isel(time=slice(None, 1460))
ds2 = ds.isel(time=slice(1460, None))

# Save it to disk as netCDF (in temporary directory)
filepath1 = f"{tmpdir}/air1.nc"
filepath2 = f"{tmpdir}/air2.nc"
ds1.to_netcdf(filepath1)
ds2.to_netcdf(filepath2)

return filepath1, filepath2


class TestOpenVirtualDatasetIndexes:
def test_no_indexes(self, netcdf4_file):
vds = open_virtual_dataset(netcdf4_file, indexes={})
assert vds.indexes == {}

def test_create_default_indexes(self, netcdf4_file):
vds = open_virtual_dataset(netcdf4_file, indexes=None)
ds = xr.open_dataset(netcdf4_file)

# TODO use xr.testing.assert_identical(vds.indexes, ds.indexes) instead once class supported by assertion comparison, see https://github.com/pydata/xarray/issues/5812
assert index_mappings_equal(vds.xindexes, ds.xindexes)


def index_mappings_equal(indexes1: Mapping[str, Index], indexes2: Mapping[str, Index]):
# Check if the mappings have the same keys
if set(indexes1.keys()) != set(indexes2.keys()):
return False

Check warning on line 274 in virtualizarr/tests/test_xarray.py

View check run for this annotation

Codecov / codecov/patch

virtualizarr/tests/test_xarray.py#L274

Added line #L274 was not covered by tests

# Check if the values for each key are identical
for key in indexes1.keys():
index1 = indexes1[key]
index2 = indexes2[key]

if not index1.equals(index2):
return False

Check warning on line 282 in virtualizarr/tests/test_xarray.py

View check run for this annotation

Codecov / codecov/patch

virtualizarr/tests/test_xarray.py#L282

Added line #L282 was not covered by tests

return True


class TestCombineUsingIndexes:
def test_combine_by_coords(self, netcdf4_files):
filepath1, filepath2 = netcdf4_files

vds1 = open_virtual_dataset(filepath1)
vds2 = open_virtual_dataset(filepath2)

combined_vds = xr.combine_by_coords(
[vds2, vds1],
)

assert combined_vds.xindexes["time"].to_pandas_index().is_monotonic_increasing
68 changes: 42 additions & 26 deletions virtualizarr/xarray.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import List, Literal, Optional, Union, overload
from typing import List, Literal, Mapping, Optional, Union, overload

import ujson # type: ignore
import xarray as xr
from xarray import register_dataset_accessor
from xarray.backends import BackendArray
from xarray.core.indexes import Index

import virtualizarr.kerchunk as kerchunk
from virtualizarr.kerchunk import KerchunkStoreRefs
Expand All @@ -20,13 +21,15 @@ def open_virtual_dataset(
filepath: str,
filetype: Optional[str] = None,
drop_variables: Optional[List[str]] = None,
indexes: Optional[Mapping[str, Index]] = None,
virtual_array_class=ManifestArray,
indexes={},
) -> xr.Dataset:
"""
Open a file or store as an xarray Dataset wrapping virtualized zarr arrays.

It's important that we avoid creating any IndexVariables, as our virtualized zarr array objects don't actually contain a collection that can be turned into a pandas.Index.
No data variables will be loaded.

Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass indexes={}.

Parameters
----------
Expand All @@ -38,27 +41,39 @@ def open_virtual_dataset(
If not provided will attempt to automatically infer the correct filetype from the the filepath's extension.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
indexes : Mapping[str, Index], default is None
Indexes to use on the returned xarray Dataset.
Default is None, which will read any 1D coordinate data to create in-memory Pandas indexes.
To avoid creating any indexes, pass indexes={}.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
"""

# this is the only place we actually always need to use kerchunk directly
ds_refs = kerchunk.read_kerchunk_references_from_file(
vds_refs = kerchunk.read_kerchunk_references_from_file(
filepath=filepath,
filetype=filetype,
)

ds = dataset_from_kerchunk_refs(
ds_refs,
if indexes is None:
# add default indexes by reading data from file
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
ds = xr.open_dataset(filepath)
indexes = ds.xindexes
ds.close()

vds = dataset_from_kerchunk_refs(
vds_refs,
drop_variables=drop_variables,
virtual_array_class=virtual_array_class,
indexes=indexes,
)

Comment on lines +54 to 72
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're literally opening the file twice here - once with kerchunk to read all the byte ranges, and then optionally once again to read in the values to use to create defaut pandas indexes with xarray.

Wondering if you have any thoughts on how hard it might be to consolidate these @jhamman ?

# TODO we should probably also use ds.set_close() to tell xarray how to close the file we opened
# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened

return ds
return vds


def dataset_from_kerchunk_refs(
Expand Down Expand Up @@ -86,14 +101,9 @@ def dataset_from_kerchunk_refs(

vars = {}
for var_name in var_names_to_keep:
# TODO abstract all this parsing into a function/method?
arr_refs = kerchunk.extract_array_refs(refs, var_name)
chunk_dict, zarray, zattrs = kerchunk.parse_array_refs(arr_refs)
manifest = ChunkManifest.from_kerchunk_chunk_dict(chunk_dict)
dims = zattrs["_ARRAY_DIMENSIONS"]

varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest)
vars[var_name] = xr.Variable(data=varr, dims=dims, attrs=zattrs)
vars[var_name] = variable_from_kerchunk_refs(
refs, var_name, virtual_array_class
)

data_vars, coords = separate_coords(vars, indexes)

Expand All @@ -109,6 +119,20 @@ def dataset_from_kerchunk_refs(
return ds


def variable_from_kerchunk_refs(
refs: KerchunkStoreRefs, var_name: str, virtual_array_class
) -> xr.Variable:
"""Create a single xarray Variable by reading specific keys of a kerchunk references dict."""

arr_refs = kerchunk.extract_array_refs(refs, var_name)
chunk_dict, zarray, zattrs = kerchunk.parse_array_refs(arr_refs)
manifest = ChunkManifest.from_kerchunk_chunk_dict(chunk_dict)
dims = zattrs["_ARRAY_DIMENSIONS"]
varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest)

return xr.Variable(data=varr, dims=dims, attrs=zattrs)


def separate_coords(
vars: dict[str, xr.Variable],
indexes={},
Expand All @@ -121,6 +145,7 @@ def separate_coords(

# this would normally come from CF decoding, let's hope the fact we're skipping that doesn't cause any problems...
coord_names: List[str] = []

# split data and coordinate variables (promote dimension coordinates)
data_vars = {}
coord_vars = {}
Expand All @@ -135,16 +160,7 @@ def separate_coords(
else:
data_vars[name] = var

# this is stolen from https://github.com/pydata/xarray/pull/8051
# needed otherwise xarray errors whilst trying to turn the KerchunkArrays for the 1D coordinate variables into indexes
# but it doesn't appear to work with `main` since #8107, which is why the workaround above is needed
# EDIT: actually even the workaround doesn't work - to avoid creating indexes I had to checkout xarray v2023.08.0, the last one before #8107 was merged
set_indexes = False
if set_indexes:
coords = coord_vars
else:
# explict Coordinates object with no index passed
coords = xr.Coordinates(coord_vars, indexes=indexes)
coords = xr.Coordinates(coord_vars, indexes=indexes)

return data_vars, coords

Expand Down
Loading