Skip to content

Commit

Permalink
update variables
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Oct 17, 2024
1 parent 05c1d70 commit 5f10564
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 79 deletions.
86 changes: 45 additions & 41 deletions satellite/downloader/extract_reanalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@
is not possible. Avoid using the current month.
"""

import logging
import os
from dotenv import load_dotenv
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Literal

import urllib3
from dotenv import load_dotenv
from cdsapi.api import Client
from requests.exceptions import RequestException

_GLOBE_AREA = {"N": 90.0, "W": -180.0, "S": -90.0, "E": 180.0}
_DATA_DIR = Path.home() / "copernicus_data"
Expand Down Expand Up @@ -78,7 +78,7 @@ def download_netcdf(
`cdsapi.Client()`. Data can be retrieved for a specific date or a
date range, usage:
download_netcdf('filename') -> downloads the last available date globalwide
download_netcdf('filename') -> downloads the last available date worldwide
download_netcdf('filename', date='2022-10-04')
Expand Down Expand Up @@ -129,11 +129,6 @@ def download_netcdf(
"https://cds.climate.copernicus.eu/user/USER"
)

conn = Client(
url="https://cds.climate.copernicus.eu/api",
key=cdsapi_token,
)

if locale and locale not in _LOCALES:
raise ValueError(f"locale {locale} not supported. Options: {_LOCALES}")

Expand Down Expand Up @@ -187,37 +182,46 @@ def download_netcdf(
if abs(area["W"]) > 180 or abs(area["E"]) > 180:
raise ValueError("Longitude must be between -180 and 180")

file = f"{output_dir}/{filename}.nc"

if Path(file).exists():
return file

urllib3.disable_warnings()
conn.retrieve(
"reanalysis-era5-land",
{
"product_type": ["reanalysis"],
"variable": [
"2m_temperature",
"total_precipitation",
"2m_dewpoint_temperature",
"surface_pressure",
],
"date": date_req,
"time": [
"00:00",
"03:00",
"06:00",
"09:00",
"12:00",
"15:00",
"18:00",
"21:00",
],
"area": [area["N"], area["W"], area["S"], area["E"]],
"format": "netcdf",
},
str(file),
).download()
file = Path(output_dir) / f"{filename}.zip"

if file.exists():
return str(file)

conn = Client(
url="https://cds.climate.copernicus.eu/api",
key=cdsapi_token,
)

try:
conn.retrieve(
"reanalysis-era5-land",
{
"product_type": ["reanalysis"],
"variable": [
"2m_temperature",
"total_precipitation",
"2m_dewpoint_temperature",
"surface_pressure",
],
"date": date_req,
"time": [
"00:00",
"03:00",
"06:00",
"09:00",
"12:00",
"15:00",
"18:00",
"21:00",
],
"area": [area["N"], area["W"], area["S"], area["E"]],
"format": "netcdf",
"download_format": "zip",
},
str(file),
)
except (RequestException, KeyboardInterrupt) as e:
file.unlink(missing_ok=True)
raise e

return str(file)
19 changes: 9 additions & 10 deletions satellite/downloader/reanalysis/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def _time_regex_search(text: str):
times: list = api_vars.TIME

regex = re.compile(
"^[\n]*?((\d?\d:\d\d[\n]?)*?-?(\d\d:\d\d?)?)?,? ?(\d\d?)? ?[hours]*?[\n]*?$"
r"^[\n]*?((\d?\d:\d\d[\n]?)*?-?(\d\d:\d\d?)?)?,? ?(\d\d?)? ?[hours]*?[\n]*?$"
)

matches: list = re.match(regex, text).groups()
Expand All @@ -470,7 +470,7 @@ def _time_regex_search(text: str):
ini: int = times.index(expr[0])
end: int = times.index(expr[1])
if ini < end:
return times[ini : end + 1] # Inclusive
return times[ini: end + 1] # Inclusive
elif ini > end:
res = []
res.extend(times[ini:])
Expand All @@ -480,24 +480,23 @@ def _time_regex_search(text: str):
return None
else:
return None

elif len(expr) == 3:
if expr[0] in times and expr[1] in times and str(expr[2]).isdigit():
ini: int = times.index(expr[0])
end: int = times.index(expr[1])
step: int = int(expr[2])

if ini < end:
return times[ini : end + 1 : step]
elif ini > end:
return times[ini: end + 1: step]

if ini > end:
res = []
res.extend(times[ini:])
res.extend(times[: end + 1])
return sorted(res)[::step]
else:
return None
else:
return None
else:
return None

return None


def _format_prompt():
Expand Down
88 changes: 67 additions & 21 deletions satellite/weather/copebr.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ def __init__(self, xarray_ds: xr.Dataset):
self.locale = "BR"

def to_dataframe(self, geocodes: Union[list[int], int], raw: bool = False):
df = _final_dataframe(dataset=self._ds, geocodes=geocodes, raw=raw)
df = _final_dataframe(
dataset=self._ds,
geocodes=geocodes,
locale=self.locale,
raw=raw
)
if isinstance(df, dask.dataframe.DataFrame):
df = df.compute()
df = df.reset_index(drop=True)
Expand All @@ -102,13 +107,19 @@ def to_sql(
con=con,
schema=schema,
tablename=tablename,
locale=self.locale,
raw=raw,
)
if verbose:
logger.info(f"{geocode} updated on {schema}.{tablename}")

def geocode_ds(self, geocode: int, raw: bool = False):
return _geocode_ds(self._ds, geocode, self.locale, raw)
return _geocode_ds(
ds=self._ds,
geocode=geocode,
locale=self.locale,
raw=raw
)


@xr.register_dataset_accessor("CopeAR")
Expand All @@ -118,7 +129,12 @@ def __init__(self, xarray_ds: xr.Dataset):
self.locale = "AR"

def to_dataframe(self, geocodes: Union[list[str], str], raw: bool = False):
df = _final_dataframe(dataset=self._ds, geocodes=geocodes, raw=raw)
df = _final_dataframe(
dataset=self._ds,
geocodes=geocodes,
locale=self.locale,
raw=raw
)
if isinstance(df, dask.dataframe.DataFrame):
df = df.compute()
df = df.reset_index(drop=True)
Expand Down Expand Up @@ -151,13 +167,21 @@ def geocode_ds(self, geocode: str, raw: bool = False):


def _final_dataframe(
dataset: xr.Dataset, geocodes: Union[list[str | int], int | str], raw=False
dataset: xr.Dataset,
geocodes: Union[list[str | int], int | str],
locale: str,
raw=False
) -> pd.DataFrame:
geocodes = [geocodes] if isinstance(geocodes, int) else geocodes

dfs = []
for geocode in geocodes:
dfs.append(_geocode_to_dataframe(dataset, geocode, raw))
dfs.append(_geocode_to_dataframe(
dataset=dataset,
geocode=geocode,
locale=locale,
raw=raw
))

final_df = dd.concat(dfs)

Expand All @@ -178,9 +202,15 @@ def _geocode_to_sql(
con: Connectable,
schema: str,
tablename: str,
locale: str,
raw: bool,
) -> None:
df = _geocode_to_dataframe(dataset=dataset, geocode=geocode, raw=raw)
df = _geocode_to_dataframe(
dataset=dataset,
geocode=geocode,
locale=locale,
raw=raw
)
df = df.reset_index(drop=False)
if raw:
df = df.rename(columns={"time": "datetime"})
Expand All @@ -197,7 +227,12 @@ def _geocode_to_sql(
del df


def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False) -> pd.DataFrame:
def _geocode_to_dataframe(
dataset: xr.Dataset,
geocode: int,
locale: str,
raw: bool = False
) -> pd.DataFrame:
"""
Returns a DataFrame with the values related to the geocode of a
city according to each country's standard. Extract the values
Expand All @@ -214,13 +249,20 @@ def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False) -> pd.Da
but with two extra columns with the geocode and epiweek,
the integer columns are also rounded to 4 decimals digits
"""
ds = _geocode_ds(dataset, geocode, raw)
ds = _geocode_ds(
ds=dataset,
geocode=geocode,
locale=locale,
raw=raw
)
df = ds.to_dataframe()
del ds
geocode = [geocode for g in range(len(df))]
df = df.assign(geocode=da.from_array(geocode))
df = df.assign(epiweek=str(Week.fromdate(df.index.to_pydatetime()[0])))
columns_to_round = list(set(df.columns).difference(set(["geocode", "epiweek"])))
columns_to_round = list(
set(df.columns).difference(set(["geocode", "epiweek"]))
)
df[columns_to_round] = df[columns_to_round].map(lambda x: np.round(x, 4))
return df

Expand All @@ -247,7 +289,7 @@ def _geocode_ds(
the data corresponds to a 3h interval range for
each day in the dataset.
"""
lats, lons = _get_latlons(geocode, locale)
lats, lons = _get_latlons(geocode=geocode, locale=locale)

geocode_ds = _convert_to_br_units(
_slice_dataset_by_coord(dataset=ds, lats=lats, lons=lons)
Expand Down Expand Up @@ -288,14 +330,14 @@ def _convert_to_br_units(dataset: xr.Dataset) -> xr.Dataset:
Parse measure units. Rename their unit names and long names as well.
"""
ds = dataset
vars = list(ds.data_vars.keys())
_vars = list(ds.data_vars.keys())

if "t2m" in vars:
if "t2m" in _vars:
# Convert Kelvin to Celsius degrees
ds["t2m"] = ds.t2m - 273.15
ds["t2m"].attrs = {"units": "degC", "long_name": "Temperatura"}

if "d2m" in vars:
if "d2m" in _vars:
# Calculate Relative Humidity percentage and add to Dataset
ds["d2m"] = ds.d2m - 273.15

Expand All @@ -309,27 +351,28 @@ def _convert_to_br_units(dataset: xr.Dataset) -> xr.Dataset:
"units": "pct",
"long_name": "Umidade Relativa do Ar",
}
if "tp" in vars:
if "tp" in _vars:
# Convert meters to millimeters
ds["tp"] = ds.tp * 1000
ds["tp"] = ds.tp.round(5)
ds["tp"].attrs = {"units": "mm", "long_name": "Precipitação"}
if "msl" in vars:
if "sp" in _vars:
# Convert Pa to ATM
ds["msl"] = ds.msl * 0.00000986923
ds["msl"].attrs = {
ds["sp"] = ds.sp * 0.00000986923
ds["sp"].attrs = {
"units": "atm",
"long_name": "Pressão ao Nível do Mar",
}

with_br_vars = {
parsed_vars = {
"valid_time": "time",
"t2m": "temp",
"tp": "precip",
"msl": "pressao",
"sp": "pressao",
"d2m": "umid",
}

return ds.rename(with_br_vars)
return ds.rename(parsed_vars)


def _reduce_by(ds: xr.Dataset, func, prefix: str):
Expand All @@ -349,7 +392,10 @@ def _reduce_by(ds: xr.Dataset, func, prefix: str):
)


def _get_latlons(geocode: int | str, locale: str) -> tuple[list[float], list[float]]:
def _get_latlons(
geocode: int | str,
locale: str
) -> tuple[list[float], list[float]]:
"""
Extract Latitude and Longitude from a geocode of the specific locale.
"""
Expand Down
Loading

0 comments on commit 5f10564

Please sign in to comment.