Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a fetcher for AWS S3 Argo GDAC data #385

Open
wants to merge 52 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
d09c59c
[skip-ci]
gmaze Sep 3, 2024
a7cab73
Add S3 support to gdac data fetcher
gmaze Sep 13, 2024
215acf5
misc
gmaze Sep 13, 2024
2c723f1
Merge branch 'other-major-breaking-refactoring' into 302-new-data-sou…
gmaze Sep 13, 2024
55b3723
Update test_fetchers_data_gdac.py
gmaze Sep 13, 2024
8ebd896
misc
gmaze Sep 18, 2024
6747989
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Sep 18, 2024
c812a9d
Squashed commit of the following:
gmaze Sep 25, 2024
40a92a6
clear tests data
gmaze Sep 25, 2024
2b992c3
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Sep 25, 2024
3afafea
Create kerchunker.py
gmaze Oct 23, 2024
6b6faf9
Merge branch 'argo-kerchunk' into 302-new-data-source-for-gdac-from-a…
gmaze Oct 23, 2024
2d77293
misc
gmaze Oct 23, 2024
bfb0482
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Oct 23, 2024
0a4b29c
Update options.py
gmaze Oct 23, 2024
040e565
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Oct 23, 2024
0f01c3c
misc
gmaze Oct 23, 2024
c6f4186
Update kerchunker.py
gmaze Oct 24, 2024
a3a5d2d
More docstrings / docs
gmaze Oct 24, 2024
436ccc7
Update gdac_data.py
gmaze Oct 24, 2024
22411b9
Update lists.py
gmaze Oct 24, 2024
fb5e550
Update kerchunker.py
gmaze Oct 24, 2024
04aa0ed
Clear test data
gmaze Oct 24, 2024
caf0bbb
misc
gmaze Oct 24, 2024
c7220fa
misc
gmaze Oct 24, 2024
dda1eed
misc
gmaze Oct 24, 2024
6a5a532
Add support lazy http open_dataset
gmaze Oct 24, 2024
1b2ca31
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Oct 25, 2024
dca8524
fix bug in open_dataset
gmaze Oct 25, 2024
23f5f52
Update filesystems.py
gmaze Oct 25, 2024
5afa8cd
Fix 3.10 env for CI tests
gmaze Oct 25, 2024
4186506
Improve perf and docstrings
gmaze Nov 6, 2024
de5c149
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Dec 12, 2024
8f23323
Update check_gdac_option and check_gdac_path to accomodate s3
gmaze Dec 13, 2024
cef13d0
Make sure s3store is instanciated with the anon argument set
gmaze Dec 13, 2024
fef2e61
misc
gmaze Dec 13, 2024
f66138e
New gdacfs
gmaze Dec 16, 2024
5e0e49f
check_gdac_path/check_gdac_option to use gdacfs
gmaze Dec 16, 2024
c187e34
Update test_fetchers_data_gdac.py
gmaze Dec 16, 2024
3b2e820
Merge branch 'master' into 302-new-data-source-for-gdac-from-amazon-s3
gmaze Dec 16, 2024
4301a5e
Update gdac_index.py
gmaze Dec 17, 2024
9a9a970
New gdacfs and ArgoKerchuncker
gmaze Dec 17, 2024
9056665
Improve docstrings, doc and blackify
gmaze Dec 17, 2024
d8f48a4
New utility to list official GDAC servers
gmaze Dec 17, 2024
2240c6a
Update test_fetchers_data_gdac.py
gmaze Dec 18, 2024
578fb5b
Improve log, doc
gmaze Dec 19, 2024
68e90fb
Update filesystems.py
gmaze Dec 19, 2024
23c49cd
Update locals.py
gmaze Dec 19, 2024
450ba25
Update py3.10 env def
gmaze Dec 19, 2024
8a260bf
Update filesystems.py
gmaze Dec 19, 2024
bc85efd
Improve docstrings
gmaze Dec 19, 2024
3bb9091
Improve documentation
gmaze Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions argopy/data_fetchers/erddap_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__( # noqa: C901

Parameters
----------
ds: str (optional)
ds: str, default = OPTIONS['ds']
Dataset to load: 'phy' or 'ref' or 'bgc-s'
cache: bool (optional)
Cache data or not (default: False)
Expand Down Expand Up @@ -128,6 +128,13 @@ def __init__( # noqa: C901
List of BGC essential variables that can't be NaN. If set to 'all', this is an easy way to reduce the size of the
:class:`xr.DataSet`` to points where all variables have been measured. Otherwise, provide a simple list of
variables.

Other parameters
----------------
server: str, default = OPTIONS['erddap']
URL to erddap server
mode: str, default = OPTIONS['mode']

"""
timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout
self.definition = "Ifremer erddap Argo data fetcher"
Expand Down Expand Up @@ -707,11 +714,9 @@ def to_xarray( # noqa: C901
-------
:class:`xarray.Dataset`
"""

URI = self.uri # Call it once

# Should we compute (from the index) and add DATA_MODE for BGC variables:
add_dm = self.dataset_id in ["bgc", "bgc-s"] if add_dm is None else bool(add_dm)
# Pre-processor options:
preprocess_opts = {
"add_dm": False,
"URI": URI,
Expand All @@ -725,7 +730,7 @@ def to_xarray( # noqa: C901
"indexfs": self.indexfs,
}

# Download data
# Download and pre-process data:
results = []
if not self.parallelize:
if len(URI) == 1:
Expand Down Expand Up @@ -840,21 +845,6 @@ def to_xarray( # noqa: C901
[filtered.append(self.filter_measured(r)) for r in results]
results = filtered

# empty = []
# for v in self._bgc_vlist_measured:
# if v in results and np.count_nonzero(results[v]) != len(results["N_POINTS"]):
# empty.append(v)
# if len(empty) > 0:
# msg = (
# "After processing, your BGC request returned final data with NaNs (%s). "
# "This may be due to the 'measured' argument ('%s') that imposes a no-NaN constraint "
# "impossible to fulfill for the access point defined (%s)]. "
# "\nUsing the 'measured' argument, you can try to minimize the list of variables to "
# "return without NaNs, or set it to 'None' to return all samples."
# % (",".join(to_list(v)), ",".join(self._bgc_measured), self.cname())
# )
# raise ValueError(msg)

if concat and results is not None:
results["N_POINTS"] = np.arange(0, len(results["N_POINTS"]))

Expand Down
174 changes: 111 additions & 63 deletions argopy/data_fetchers/gdac_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
import warnings
import getpass
import logging
from typing import Literal

from ..utils.format import argo_split_path
from ..utils.decorators import deprecated
from ..options import OPTIONS, check_gdac_path, PARALLEL_SETUP
from ..options import OPTIONS, check_gdac_option, PARALLEL_SETUP
from ..errors import DataNotFound
from ..stores import ArgoIndex
from ..stores import ArgoIndex, has_distributed, distributed
from .proto import ArgoDataFetcherProto
from .gdac_data_processors import pre_process_multiprof, filter_points

Expand Down Expand Up @@ -56,24 +57,21 @@ def init(self, *args, **kwargs):
###
def __init__(
self,
gdac: str = "",
ds: str = "",
cache: bool = False,
cachedir: str = "",
dimension: str = "point",
errors: str = "raise",
parallel: bool = False,
progress: bool = False,
dimension: Literal['point', 'profile'] = "point",
errors: str = "raise",
api_timeout: int = 0,
**kwargs
):
"""Init fetcher

Parameters
----------
gdac: str (optional)
Path to the local or remote directory where the 'dac' folder is located
ds: str (optional)
ds: str, default = OPTIONS['ds']
Dataset to load: 'phy' or 'bgc'
cache: bool (optional)
Cache data or not (default: False)
Expand All @@ -97,21 +95,28 @@ def __init__(
Show a progress bar or not when fetching data.
api_timeout: int (optional)
Server request time out in seconds. Set to OPTIONS['api_timeout'] by default.

Other parameters
----------------
gdac: str, default = OPTIONS['gdac']
Path to the local or remote directory where the 'dac' folder is located
"""
self.timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout
self.dataset_id = OPTIONS["ds"] if ds == "" else ds
self.server = kwargs["gdac"] if "gdac" in kwargs else OPTIONS["gdac"]
self.user_mode = kwargs["mode"] if "mode" in kwargs else OPTIONS["mode"]
self.server = OPTIONS["gdac"] if gdac == "" else gdac

self.errors = errors
self.dimension = dimension

# Validate server, raise GdacPathError if not valid.
check_gdac_path(self.server, errors="raise")
check_gdac_option(self.server, errors="raise")

index_file = "core"
if self.dataset_id in ["bgc-s", "bgc-b"]:
index_file = self.dataset_id

# Validation of self.server is done by the ArgoIndex:
# Validation of self.server is done by the ArgoIndex instance:
self.indexfs = ArgoIndex(
host=self.server,
index_file=index_file,
Expand Down Expand Up @@ -146,10 +151,14 @@ def __repr__(self):
else:
summary.append("📕 Index: %s (not loaded)" % self.indexfs.index_file)
if hasattr(self.indexfs, "search"):
match = "matches" if self.N_FILES > 1 else "match"
match = "matches" if self.indexfs.N_MATCH > 1 else "match"
summary.append(
"📸 Index searched: True (%i %s, %0.4f%%)"
% (self.N_FILES, match, self.N_FILES * 100 / self.N_RECORDS)
% (
self.indexfs.N_MATCH,
match,
self.indexfs.N_MATCH * 100 / self.N_RECORDS,
)
)
else:
summary.append("📷 Index searched: False")
Expand Down Expand Up @@ -307,7 +316,13 @@ def _preprocess_multiprof(self, ds):
def pre_process(self, ds, *args, **kwargs):
return pre_process_multiprof(ds, *args, **kwargs)

def to_xarray(self, errors: str = "ignore"):
def to_xarray(
self,
errors: str = "ignore",
concat: bool = True,
concat_method: Literal["drop", "fill"] = "fill",
dimension: Literal['point', 'profile'] = "",
):
"""Load Argo data and return a :class:`xarray.Dataset`

Parameters
Expand All @@ -323,18 +338,22 @@ def to_xarray(self, errors: str = "ignore"):
-------
:class:`xarray.Dataset`
"""
URI = self.uri # Call it once
dimension = self.dimension if dimension == "" else dimension

if (
len(self.uri) > 50
len(URI) > 50
and not self.parallelize
and self.parallel_method == "sequential"
):
warnings.warn(
"Found more than 50 files to load, this may take a while to process sequentially ! "
"Consider using another data source (eg: 'erddap') or the 'parallel=True' option to improve processing time."
)
elif len(self.uri) == 0:
elif len(URI) == 0:
raise DataNotFound("No data found for: %s" % self.indexfs.cname)

# Pre-processor options:
if hasattr(self, "BOX"):
access_point = "BOX"
access_point_opts = {"BOX": self.BOX}
Expand All @@ -344,59 +363,88 @@ def to_xarray(self, errors: str = "ignore"):
elif hasattr(self, "WMO"):
access_point = "WMO"
access_point_opts = {"WMO": self.WMO}
preprocess_opts = {
"access_point": access_point,
"access_point_opts": access_point_opts,
"pre_filter_points": self._post_filter_points,
"dimension": dimension,
}

# Download and pre-process data:
ds = self.fs.open_mfdataset(
self.uri,
method=self.parallel_method,
concat_dim="N_POINTS",
concat=True,
preprocess=pre_process_multiprof,
preprocess_opts={
"access_point": access_point,
"access_point_opts": access_point_opts,
"pre_filter_points": self._post_filter_points,
},
progress=self.progress,
errors=errors,
open_dataset_opts={
opts = {
"progress": self.progress,
"errors": errors,
"concat": concat,
"concat_dim": "N_POINTS",
"preprocess": pre_process_multiprof,
"preprocess_opts": preprocess_opts,
}
if self.parallel_method in ["thread"]:
opts["method"] = "thread"
opts["open_dataset_opts"] = {
"xr_opts": {"decode_cf": 1, "use_cftime": 0, "mask_and_scale": 1}
},
)
}

# Meta-data processing:
ds["N_POINTS"] = np.arange(
0, len(ds["N_POINTS"])
) # Re-index to avoid duplicate values
ds = ds.set_coords("N_POINTS")
ds = ds.sortby("TIME")

# Remove netcdf file attributes and replace them with simplified argopy ones:
if "Fetched_from" not in ds.attrs:
raw_attrs = ds.attrs
ds.attrs = {}
ds.attrs.update({"raw_attrs": raw_attrs})
if self.dataset_id == "phy":
ds.attrs["DATA_ID"] = "ARGO"
if self.dataset_id in ["bgc", "bgc-s"]:
ds.attrs["DATA_ID"] = "ARGO-BGC"
ds.attrs["DOI"] = "http://doi.org/10.17882/42182"
ds.attrs["Fetched_from"] = self.server
try:
ds.attrs["Fetched_by"] = getpass.getuser()
except: # noqa: E722
ds.attrs["Fetched_by"] = "anonymous"
ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime(
"%Y/%m/%d"
)
elif (self.parallel_method in ["process"]) | (
has_distributed
and isinstance(self.parallel_method, distributed.client.Client)
):
opts["method"] = self.parallel_method
opts["open_dataset_opts"] = {
"errors": "ignore",
"download_url_opts": {"errors": "ignore"},
}
opts["progress"] = False

results = self.fs.open_mfdataset(URI, **opts)

if concat and results is not None:
if self.progress:
print("Final post-processing of the merged dataset ...")
# results = pre_process_multiprof(results, **preprocess_opts)
results = results.argo.cast_types(overwrite=False)

# Meta-data processing for a single merged dataset:
results = results.assign_coords({'N_POINTS': np.arange(0, len(results['N_POINTS']))})
results = results.sortby("TIME")

# Remove netcdf file attributes and replace them with simplified argopy ones:
if "Fetched_from" not in results.attrs:
raw_attrs = results.attrs

results.attrs = {}
if "Processing_history" in raw_attrs:
results.attrs.update({"Processing_history": raw_attrs["Processing_history"]})
raw_attrs.pop("Processing_history")
results.argo.add_history("URI merged with '%s'" % concat_method)

results.attrs.update({"raw_attrs": raw_attrs})
if self.dataset_id == "phy":
results.attrs["DATA_ID"] = "ARGO"
if self.dataset_id in ["bgc", "bgc-s"]:
results.attrs["DATA_ID"] = "ARGO-BGC"
results.attrs["DOI"] = "http://doi.org/10.17882/42182"
results.attrs["Fetched_from"] = self.server
try:
results.attrs["Fetched_by"] = getpass.getuser()
except: # noqa: E722
results.attrs["Fetched_by"] = "anonymous"
results.attrs["Fetched_date"] = pd.to_datetime(
"now", utc=True
).strftime("%Y/%m/%d")

results.attrs["Fetched_constraints"] = self.cname()
if len(self.uri) == 1:
results.attrs["Fetched_uri"] = self.uri[0]
else:
results.attrs["Fetched_uri"] = ";".join(self.uri)

ds.attrs["Fetched_constraints"] = self.cname()
if len(self.uri) == 1:
ds.attrs["Fetched_uri"] = self.uri[0]
if concat:
results.attrs = dict(sorted(results.attrs.items()))
else:
ds.attrs["Fetched_uri"] = ";".join(self.uri)

return ds
for ds in results:
ds.attrs = dict(sorted(ds.attrs.items()))
return results

@deprecated(
"Not serializable, please use 'gdac_data_processors.filter_points'",
Expand Down
Loading
Loading