Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make data fetching scalable with Dask #392

Merged
merged 60 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
56866b3
ongoing
gmaze Sep 17, 2024
2573bd8
Merge branch 'other-major-breaking-refactoring' into dask-ok
gmaze Sep 17, 2024
b6b4e96
more stuff trying to make the processing chaining pickelizable
gmaze Sep 18, 2024
58a0bcf
clear CI tests data
gmaze Sep 25, 2024
92b7156
Add Ci tests data
gmaze Sep 25, 2024
6b4b64a
Merge branch 'master' into dask-ok
gmaze Sep 25, 2024
f285e05
Update fetchers.py
gmaze Sep 26, 2024
944f56e
Serializable erddap processor
gmaze Sep 26, 2024
5f34187
Remove env caching in CI tests
gmaze Sep 26, 2024
af4fa3d
remove cache
gmaze Sep 26, 2024
7fc3613
Create test-mamba.yml
gmaze Sep 26, 2024
a9f0e7c
Merge branch 'master' into dask-ok
gmaze Sep 26, 2024
aafc79b
pro to pre refactor
gmaze Sep 26, 2024
e7a47c7
Update erddap_data.py
gmaze Sep 26, 2024
6f74686
Update erddap_data_processors.py
gmaze Sep 26, 2024
4f688a8
serializable gdac pre-processor
gmaze Sep 26, 2024
664716f
Update filesystems.py
gmaze Sep 27, 2024
1287cba
fix doc
gmaze Sep 27, 2024
5e5f22a
Update casting.py
gmaze Sep 27, 2024
7ba07ca
Refactor argovis pre-processor
gmaze Sep 27, 2024
594434f
Update erddap_data_processors.py
gmaze Sep 27, 2024
3f4a986
Update filesystems.py
gmaze Sep 27, 2024
64e42c2
Update envs_manager
gmaze Sep 30, 2024
8faeb56
New "parallel" and "parallel_default_method" options
gmaze Oct 1, 2024
f148b71
Improve facade handling of options
gmaze Oct 1, 2024
47e370e
Update "performance" page documentation
gmaze Oct 1, 2024
a58f980
Merge branch 'master' into dask-ok
gmaze Oct 1, 2024
484b869
Update fetchers.py
gmaze Oct 1, 2024
3db5d65
Clean data fetchers with new "parallel" option design
gmaze Oct 1, 2024
ad88757
New PARALLEL_SETUP option method
gmaze Oct 1, 2024
60bef67
Update filesystems.py
gmaze Oct 1, 2024
642aadd
Update performances.rst
gmaze Oct 1, 2024
706a7da
Update CI tests
gmaze Oct 1, 2024
358d2bf
Update CI tests data
gmaze Oct 2, 2024
bdb0ed8
Update performances.rst
gmaze Oct 2, 2024
164c97d
Merge branch 'master' into dask-ok
gmaze Oct 2, 2024
bd1a507
fix icons
gmaze Oct 2, 2024
cf444c5
Update test_fetchers_data_erddap_bgc.py
gmaze Oct 2, 2024
e8c5295
Update fetchers.py
gmaze Oct 2, 2024
01e054e
argo accessor to use datamode extension
gmaze Oct 2, 2024
7c03833
Update proto.py
gmaze Oct 2, 2024
e668a2f
Update performances.rst
gmaze Oct 2, 2024
da3e457
Update whatsnew
gmaze Oct 3, 2024
0a8f51c
Update doc [skip-ci]
gmaze Oct 3, 2024
cb462e2
Improve doc and docstrings
gmaze Oct 9, 2024
fcfd815
Update xarray.py
gmaze Oct 9, 2024
533bede
New read_dac_wmo method to ArgoIndex
gmaze Oct 9, 2024
14ec6a7
Fix CI tests
gmaze Oct 9, 2024
3a294e8
Clean erddap to_xarray arg management
gmaze Oct 10, 2024
e72cb10
Fix CI tests
gmaze Oct 10, 2024
1757967
Merge branch 'master' into dask-ok
gmaze Oct 10, 2024
3eb65b3
Fix cache dir facade bug
gmaze Oct 10, 2024
0d5e422
Update whats-new.rst
gmaze Oct 10, 2024
a26fa06
Update pytests.yml
gmaze Oct 10, 2024
4f7f536
Merge branch 'master' into dask-ok
gmaze Oct 11, 2024
e7512c1
Fix/update CI tests data
gmaze Oct 11, 2024
aac32bc
Add CI tests for Dask client parallelization method
gmaze Oct 11, 2024
07922b2
Update test_fetchers_dask_cluster.py
gmaze Oct 11, 2024
49fd61e
Merge branch 'master' into dask-ok
gmaze Oct 15, 2024
b852a65
Update test_fetchers_dask_cluster.py
gmaze Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/pytests-upstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ jobs:
run: |
echo "CONDA_ENV_FILE=ci/requirements/py${{matrix.python-version}}-core-free.yml" >> $GITHUB_ENV
echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV
echo "LOG_FILE=argopy-tests-Core-Free-Py${{matrix.python-version}}-${{matrix.os}}.log" >> $GITHUB_ENV

- name: Setup Micromamba ${{ matrix.python-version }}
uses: mamba-org/setup-micromamba@v1
Expand All @@ -110,8 +111,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}

Expand Down Expand Up @@ -237,8 +238,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}
pytest-reportlog
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/pytests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}

Expand Down Expand Up @@ -211,8 +211,8 @@ jobs:
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
cache-environment: false
# cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}

Expand Down
4 changes: 2 additions & 2 deletions argopy/data_fetchers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"erddap_data",
"erddap_index",
"argovis_data",
"gdac_data.py",
"gdac_index.py",
"gdac_data",
"gdac_index",
"CTDRefDataFetcher",
)
161 changes: 26 additions & 135 deletions argopy/data_fetchers/argovis_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
import warnings

from ..stores import httpstore
from ..options import OPTIONS, DEFAULT
from ..options import OPTIONS, DEFAULT, VALIDATE, PARALLEL_SETUP
from ..utils.format import format_oneline
from ..utils.chunking import Chunker
from ..utils.decorators import deprecated
from ..errors import DataNotFound
from .proto import ArgoDataFetcherProto

from .argovis_data_processors import pre_process, add_attributes

access_points = ["wmo", "box"]
exit_formats = ["xarray"]
Expand Down Expand Up @@ -49,7 +50,6 @@ def __init__(
cache: bool = False,
cachedir: str = "",
parallel: bool = False,
parallel_method: str = "thread",
progress: bool = False,
chunks: str = "auto",
chunks_maxsize: dict = {},
Expand All @@ -66,10 +66,13 @@ def __init__(
Cache data or not (default: False)
cachedir: str (optional)
Path to cache folder
parallel: bool (optional)
Chunk request to use parallel fetching (default: False)
parallel_method: str (optional)
Define the parallelization method: ``thread``, ``process`` or a :class:`dask.distributed.client.Client`.
parallel: bool, str, :class:`distributed.Client`, default: False
Set whether to use parallelization or not, and possibly which method to use.

Possible values:
- ``False``: no parallelization is used
- ``True``: use default method specified by the ``parallel_default_method`` option
- any other values accepted by the ``parallel_default_method`` option
progress: bool (optional)
Show a progress bar or not when ``parallel`` is set to True.
chunks: 'auto' or dict of integers (optional)
Expand All @@ -96,16 +99,8 @@ def __init__(
}
self.fs = kwargs["fs"] if "fs" in kwargs else httpstore(**self.store_opts)

if not isinstance(parallel, bool):
parallel_method = parallel
parallel = True
if parallel_method not in ["thread"]:
raise ValueError(
"argovis only support multi-threading, use 'thread' instead of '%s'"
% parallel_method
)
self.parallel = parallel
self.parallel_method = parallel_method
self.parallelize, self.parallel_method = PARALLEL_SETUP(parallel)

self.progress = progress
self.chunks = chunks
self.chunks_maxsize = chunks_maxsize
Expand Down Expand Up @@ -149,105 +144,6 @@ def _add_history(self, this, txt):
this.attrs["history"] = txt
return this

def _add_attributes(self, this): # noqa: C901
"""Add variables attributes not return by argovis requests

#todo: This is hard coded, but should be retrieved from an API somewhere
"""
for v in this.data_vars:
if "TEMP" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "SEA TEMPERATURE IN SITU ITS-90 SCALE",
"standard_name": "sea_water_temperature",
"units": "degree_Celsius",
"valid_min": -2.0,
"valid_max": 40.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "PSAL" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "PRACTICAL SALINITY",
"standard_name": "sea_water_salinity",
"units": "psu",
"valid_min": 0.0,
"valid_max": 43.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "PRES" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "Sea Pressure",
"standard_name": "sea_water_pressure",
"units": "decibar",
"valid_min": 0.0,
"valid_max": 12000.0,
"resolution": 0.1,
"axis": "Z",
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "DOXY" in v and "_QC" not in v:
this[v].attrs = {
"long_name": "Dissolved oxygen",
"standard_name": "moles_of_oxygen_per_unit_mass_in_sea_water",
"units": "micromole/kg",
"valid_min": -5.0,
"valid_max": 600.0,
"resolution": 0.001,
}
if "ERROR" in v:
this[v].attrs["long_name"] = (
"ERROR IN %s" % this[v].attrs["long_name"]
)

for v in this.data_vars:
if "_QC" in v:
attrs = {
"long_name": "Global quality flag of %s profile" % v,
"convention": "Argo reference table 2a",
}
this[v].attrs = attrs

if "CYCLE_NUMBER" in this.data_vars:
this["CYCLE_NUMBER"].attrs = {
"long_name": "Float cycle number",
"convention": "0..N, 0 : launch cycle (if exists), 1 : first complete cycle",
}

if "DATA_MODE" in this.data_vars:
this["DATA_MODE"].attrs = {
"long_name": "Delayed mode or real time data",
"convention": "R : real time; D : delayed mode; A : real time with adjustment",
}

if "DIRECTION" in this.data_vars:
this["DIRECTION"].attrs = {
"long_name": "Direction of the station profiles",
"convention": "A: ascending profiles, D: descending profiles",
}

if "PLATFORM_NUMBER" in this.data_vars:
this["PLATFORM_NUMBER"].attrs = {
"long_name": "Float unique identifier",
"convention": "WMO float identifier : A9IIIII",
}

return this

@property
def cachepath(self):
Expand All @@ -269,6 +165,7 @@ def safe_for_fsspec_cache(url):

return [safe_for_fsspec_cache(url) for url in urls]

@deprecated('Not serializable')
def json2dataframe(self, profiles):
"""convert json data to Pandas DataFrame"""
# Make sure we deal with a list
Expand Down Expand Up @@ -312,38 +209,34 @@ def json2dataframe(self, profiles):
df = pd.DataFrame(rows)
return df

def to_dataframe(self, errors: str = "ignore"):
def to_dataframe(self, errors: str = "ignore") -> pd.DataFrame:
"""Load Argo data and return a Pandas dataframe"""

# Download data:
if not self.parallel:
method = "sequential"
else:
method = self.parallel_method
preprocess_opts = {'key_map': self.key_map}
df_list = self.fs.open_mfjson(
self.uri,
method=method,
preprocess=self.json2dataframe,
method=self.parallel_method,
preprocess=pre_process,
preprocess_opts=preprocess_opts,
progress=self.progress,
errors=errors,
)

# Merge results (list of dataframe):
for i, df in enumerate(df_list):
df = df.reset_index()
df = df.rename(columns=self.key_map)
df = df[[value for value in self.key_map.values() if value in df.columns]]
df_list[i] = df
df = pd.concat(df_list, ignore_index=True)
if df.shape[0] == 0:
raise DataNotFound("No data found for: %s" % self.cname())

df.sort_values(by=["TIME", "PRES"], inplace=True)
df["N_POINTS"] = np.arange(0, len(df["N_POINTS"]))
df = df.set_index(["N_POINTS"])
return df

def to_xarray(self, errors: str = "ignore"):
def to_xarray(self, errors: str = "ignore") -> xr.Dataset:
"""Download and return data as xarray Datasets"""
ds = self.to_dataframe(errors=errors).to_xarray()
# ds["TIME"] = pd.to_datetime(ds["TIME"], utc=True)
ds = ds.sortby(
["TIME", "PRES"]
) # should already be sorted by date in descending order
Expand All @@ -352,7 +245,6 @@ def to_xarray(self, errors: str = "ignore"):
) # Re-index to avoid duplicate values

# Set coordinates:
# ds = ds.set_coords('N_POINTS')
coords = ("LATITUDE", "LONGITUDE", "TIME", "N_POINTS")
ds = ds.reset_coords()
ds["N_POINTS"] = ds["N_POINTS"]
Expand All @@ -361,12 +253,11 @@ def to_xarray(self, errors: str = "ignore"):
ds = ds.rename({v: v.upper()})
ds = ds.set_coords(coords)

# Cast data types and add variable attributes (not available in the csv download):
ds["TIME"] = pd.to_datetime(ds["TIME"], utc=True)
ds = self._add_attributes(ds)
# Add variable attributes and cast data types:
ds = add_attributes(ds)
ds = ds.argo.cast_types()

# Remove argovis file attributes and replace them with argopy ones:
# Remove argovis dataset attributes and replace them with argopy ones:
ds.attrs = {}
if self.dataset_id == "phy":
ds.attrs["DATA_ID"] = "ARGO"
Expand Down Expand Up @@ -520,7 +411,7 @@ def uri(self):
MaxLen = np.timedelta64(MaxLenTime, "D")

urls = []
if not self.parallel:
if not self.parallelize:
# Check if the time range is not larger than allowed (MaxLenTime days):
if Lt > MaxLen:
self.Chunker = Chunker(
Expand Down
Loading