Skip to content

Commit

Permalink
Rollback/remove trio due to be very CPU costly with xagg
Browse files Browse the repository at this point in the history
  • Loading branch information
luabida committed Oct 30, 2024
1 parent 04f6b26 commit f0729ba
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 124 deletions.
75 changes: 28 additions & 47 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ packages = [
]

[tool.poetry.dependencies]
python = ">=3.11,<3.12"
python = ">=3.10,<3.12"
cdsapi = ">=0.7.3"
pandas = ">=2.0.0"
numpy = ">=1.16.4"
Expand All @@ -57,7 +57,6 @@ duckdb = "^1.1.2"
h5netcdf = "^1.4.0"
rioxarray = "^0.17.0"
xagg = "^0.3.2.4"
trio = "^0.27.0"

[tool.poetry.group.dev.dependencies]
pytest = ">=7.4"
Expand Down
104 changes: 48 additions & 56 deletions satellite/extensions/cope.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import numpy as np
import xarray as xr
import xagg as xa
import trio
from loguru import logger
from epiweeks import Week

Expand Down Expand Up @@ -72,7 +71,11 @@ def __init__(self, xarray_ds: xr.Dataset):
self._ds = xarray_ds

def to_dataframe(self, adms: Union[list[ADM], ADM]) -> pd.DataFrame:
return trio.run(self._ato_dataframe, adms)
adms = [adms] if isinstance(adms, ADMBase) else adms
dfs = []
for adm in adms:
dfs.append(_adm_to_dataframe(self._ds, adm=adm))
return pd.concat(dfs)

def to_sql(
self,
Expand All @@ -84,65 +87,54 @@ def to_sql(
verbose: bool = True,
) -> None:
adms = [adms] if isinstance(adms, ADMBase) else adms
trio.run(self._ato_sql_async, adms, con, tablename, schema, verbose)
for adm in adms:
_geocode_to_sql(
dataset=self._ds,
adm=adm,
con=con,
schema=schema,
tablename=tablename,
)
if verbose:
logger.info(
f"{adm.code} updated on "
f"{schema + '.' if schema else ''}{tablename}"
)

async def _ato_dataframe(self, adms: Union[list[ADM], ADM]) -> pd.DataFrame:
adms = [adms] if isinstance(adms, ADMBase) else adms
dfs = []
async with trio.open_nursery() as nursery:
for adm in adms:
nursery.start_soon(self._adm_dataframe, adm, dfs)
return pd.concat(dfs)
def adm_ds(self, adm: ADM):
return _adm_ds(ds=self._ds, adm=adm)

async def _adm_dataframe(self, adm: ADM, dfs: list) -> None:
ds = await trio.to_thread.run_sync(self.adm_ds, adm)
df = ds.to_dataframe().reset_index()
df = df.assign(epiweek=str(Week.fromdate(pd.to_datetime(df.time)[0])))
columns_to_round = list(
set(df.columns).difference(set(["time", "code", "name", "epiweek"]))
)
df[columns_to_round] = df[columns_to_round].map(lambda x: np.round(x, 4))
df = df.drop(columns=["poly_idx", "name"])
df = df.rename(columns={"time": "date", "code": "geocode"})
dfs.append(df)

async def _ato_sql_async(
self,
adms: Union[list[int], int],
con,
tablename: str,
schema: Optional[str] = None,
verbose: bool = True,
) -> None:
async with trio.open_nursery() as nursery:
for adm in adms:
nursery.start_soon(
self._adm_to_sql, adm, con, tablename, schema, verbose
)
def _geocode_to_sql(
dataset: xr.Dataset,
adm: ADM,
con,
schema: str,
tablename: str,
) -> None:
df = _adm_to_dataframe(dataset=dataset, adm=adm)
df.to_sql(
name=tablename,
schema=schema,
con=con,
if_exists="append",
index=False,
)
del df

async def _adm_to_sql(
self,
adm: ADM,
con,
tablename: str,
schema: Optional[str],
verbose: bool,
) -> None:
df = await self._ato_dataframe(adm)
df.to_sql(
name=tablename,
schema=schema,
con=con,
if_exists="append",
index=False,
)
if verbose:
logger.info(
f"{adm.code} updated on {schema + '.' if schema else ''}{tablename}"
)

def adm_ds(self, adm: ADM):
return _adm_ds(ds=self._ds, adm=adm)
def _adm_to_dataframe(dataset: xr.Dataset, adm: ADM) -> pd.DataFrame:
ds = _adm_ds(ds=dataset, adm=adm)
df = ds.to_dataframe().reset_index()
del ds
df = df.assign(epiweek=str(Week.fromdate(pd.to_datetime(df.time)[0])))
columns_to_round = list(
set(df.columns).difference(set(["time", "code", "name", "epiweek"]))
)
df[columns_to_round] = df[columns_to_round].map(lambda x: np.round(x, 4))
df = df.drop(columns=["poly_idx", "name"])
df = df.rename(columns={"time": "date", "code": "geocode"})
return df


def _adm_ds(ds: xr.Dataset, adm: ADM) -> xr.Dataset:
Expand Down
26 changes: 8 additions & 18 deletions satellite/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,29 +236,19 @@ class ERA5LandRequest(BaseRequest):
request: ERA5LandSpecs = Field(default=ERA5LandSpecs(), validate_default=True)

# pylint: disable=maybe-no-member
def download(self, output: Optional[str] = None) -> str:
def download(self, output: str) -> str:
request: ERA5LandSpecs = self.request
output = Path(output)

output = Path(output) if output else Path(".")
if output.is_dir():
output.mkdir(parents=True, exist_ok=True)
fname_parts = [self.name, request.date]
if request.locale:
fname_parts.append(request.locale)
s = ".nc" if request.format == "netcdf" else ".grib"
suffix = ".zip" if request.download_format == "zip" else s
output = (output / "_".join(fname_parts)).with_suffix(suffix)
if request.download_format == "zip":
output = output.with_suffix(".zip")
else:
output.parent.mkdir(parents=True, exist_ok=True)
if request.download_format == "zip":
output = output.with_suffix(".zip")
if request.format == "netcdf":
output = output.with_suffix(".nc")
else:
if request.format == "netcdf":
output = output.with_suffix(".nc")
else:
output = output.with_suffix(".grib")
output = output.with_suffix(".grib")

if output.exists():
if not output.is_dir() and output.exists():
return str(output)

client = self.get_client(self.api_key)
Expand Down
2 changes: 1 addition & 1 deletion satellite/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


def reanalysis_era5_land(
output: Optional[str] = None,
output: str,
api_token: Optional[str] = None,
product_type: list[str] = ["reanalysis"],
variable: list[str] = [
Expand Down

0 comments on commit f0729ba

Please sign in to comment.