Skip to content

Commit

Permalink
Isolate CopeDatasetExtensionBase into an ABC class, create CopeAR xr …
Browse files Browse the repository at this point in the history
…extension
  • Loading branch information
luabida committed Oct 17, 2024
1 parent 1262eaa commit a6be13a
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 98 deletions.
1 change: 1 addition & 0 deletions satellite/weather/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import xarray as xr

from .copebr import * # noqa

# from .dsei import * # noqa


Expand Down
195 changes: 118 additions & 77 deletions satellite/weather/copebr.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
from typing import Union, Literal
from abc import ABC, abstractmethod
from typing import Union

from epiweeks import Week
import pandas as pd
import dask
import dask.array as da # type: ignore
import dask.dataframe as dd # type: ignore
import numpy as np # type: ignore
import xarray as xr # type: ignore
from loguru import logger # type: ignore
from sqlalchemy.engine import Connectable # type: ignore

# from .locales import BR
import dask.array as da
import dask.dataframe as dd
import numpy as np
import xarray as xr
from loguru import logger
from epiweeks import Week
from sqlalchemy.engine import Connectable

from .utils import extract_latlons, extract_coordinates

xr.set_options(keep_attrs=True)


@xr.register_dataset_accessor("copebr")
class CopeBRDatasetExtension:
class CopeDatasetExtensionBase(ABC):
"""
xarray.Dataset.copebr
---------------------
This class is an `xr.Dataset` extension. It works as a dataset
layer with the purpose of enhancing the dataset with new methods.
The expect input dataset is an `netCDF4` file from Copernicus API;
this extension will work on certain data variables, the method that
extracts with the correct parameters can be found in `extract_reanalysis`
module.
This class is an `xr.Dataset` extension base class. It's children will
works as a dataset layer with the purpose of enhancing the xarray dataset
with new methods. The expect input dataset is an `netCDF4` file from
Copernicus API; this extension will work on certain data variables,
the method that extracts with the correct parameters can be found in
`extract_reanalysis` module.
Usage:
```
import satellite.weather as sat
ds = sat.load_dataset('file/path')
ds.Cope<ABV>.to_dataframe(geocode)
ds.Cope<ABV>.geocode_ds(geocode)
```
```
import satellite.weather as sat
ds = sat.load_dataset('file/path')
RJ_geocode = 3304557
rio_df = ds.copebr.to_dataframe(RJ_geocode)
rio_ds = ds.copebr.ds_from_geocode(RJ_geocode)
```
The original dataset will be parsed into Brazilian's data format and can
be sliced by a Geocode from any City in Brazil, according to IBGE geocodes.
The expect output when the requested data is not `raw` is:
date : datetime object.
Expand All @@ -59,38 +52,48 @@ class CopeBRDatasetExtension:
umid_max : Maximum┘
"""

def __init__(
self,
xarray_ds: xr.Dataset,
locale: Literal['BR', 'AR'] = 'BR'
) -> None:
@abstractmethod
def geocode_ds(self, geocode, raw, **kwargs) -> xr.Dataset:
pass

@abstractmethod
def to_dataframe(self, geocodes, raw, **kwargs) -> pd.DataFrame:
pass

@abstractmethod
def to_sql(self, geocodes, con, tablename, schema, raw, **kwargs) -> None:
"""
Reads the data for each geocode and insert the rows into the
database one by one, created by sqlalchemy engine with the URI.
This method is convenient to prevent the memory overhead when
executing with a large amount of geocodes.
"""
pass


@xr.register_dataset_accessor("CopeBR")
class CopeBRDatasetExtension:
def __init__(self, xarray_ds: xr.Dataset):
self._ds = xarray_ds
self.locale = locale
self.locale = "BR"

def to_dataframe(self, geocodes: Union[list, int], raw: bool = False):
def to_dataframe(self, geocodes: Union[list[int], int], raw: bool = False):
df = _final_dataframe(dataset=self._ds, geocodes=geocodes, raw=raw)

if isinstance(df, dask.dataframe.DataFrame):
df = df.compute()

df = df.reset_index(drop=True)

return df

def to_sql(
self,
geocodes: Union[list, int],
geocodes: Union[list[int], int],
con: Connectable,
tablename: str,
schema: str,
raw: bool = False,
verbose: bool = True,
) -> None:
"""
Reads the data for each geocode and insert the rows into the
database one by one, created by sqlalchemy engine with the URI.
This method is convenient to prevent the memory overhead when
executing with a large amount of geocodes.
"""
geocodes = [geocodes] if isinstance(geocodes, int) else geocodes
for geocode in geocodes:
_geocode_to_sql(
Expand All @@ -101,17 +104,55 @@ def to_sql(
tablename=tablename,
raw=raw,
)
logger.debug(f"{geocode} updated on {schema}.{tablename}")
if verbose:
logger.info(f"{geocode} updated on {schema}.{tablename}")

def geocode_ds(self, geocode: int | str, raw: bool = False):
def geocode_ds(self, geocode: int, raw: bool = False):
return _geocode_ds(self._ds, geocode, self.locale, raw)


@xr.register_dataset_accessor("CopeAR")
class CopeARDatasetExtension:
def __init__(self, xarray_ds: xr.Dataset):
self._ds = xarray_ds
self.locale = "AR"

def to_dataframe(self, geocodes: Union[list[str], str], raw: bool = False):
df = _final_dataframe(dataset=self._ds, geocodes=geocodes, raw=raw)
if isinstance(df, dask.dataframe.DataFrame):
df = df.compute()
df = df.reset_index(drop=True)
return df

def to_sql(
self,
geocodes: Union[list[str], str],
con: Connectable,
tablename: str,
schema: str,
raw: bool = False,
verbose: bool = True,
):
geocodes = [geocodes] if isinstance(geocodes, int) else geocodes
for geocode in geocodes:
_geocode_to_sql(
dataset=self._ds,
geocode=geocode,
con=con,
schema=schema,
tablename=tablename,
raw=raw,
)
if verbose:
logger.info(f"{geocode} updated on {schema}.{tablename}")

def geocode_ds(self, geocode: str, raw: bool = False):
return _geocode_ds(self._ds, geocode, self.locale, raw)


def _final_dataframe(
dataset: xr.Dataset,
geocodes: Union[list, int],
raw=False
):
dataset: xr.Dataset, geocodes: Union[list[str | int], int | str], raw=False
) -> pd.DataFrame:
geocodes = [geocodes] if isinstance(geocodes, int) else geocodes

dfs = []
Expand All @@ -138,7 +179,7 @@ def _geocode_to_sql(
schema: str,
tablename: str,
raw: bool,
):
) -> None:
df = _geocode_to_dataframe(dataset=dataset, geocode=geocode, raw=raw)
df = df.reset_index(drop=False)
if raw:
Expand All @@ -156,14 +197,14 @@ def _geocode_to_sql(
del df


def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False):
def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False) -> pd.DataFrame:
"""
Returns a DataFrame with the values related to the geocode of a
brazilian city according to IBGE's format. Extract the values
city according to each country's standard. Extract the values
using `ds_from_geocode()` and return `xr.Dataset.to_dataframe()`
from Xarray, inserting the geocode into the final DataFrame.
Attrs:
geocode (str or int): Geocode of a city in Brazil according to IBGE.
geocode (str or int): Geocode of a city
raw (bool) : If raw is set to True, the DataFrame returned
will contain data in 3 hours intervals.
Default return will aggregate these values
Expand All @@ -179,25 +220,26 @@ def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False):
geocode = [geocode for g in range(len(df))]
df = df.assign(geocode=da.from_array(geocode))
df = df.assign(epiweek=str(Week.fromdate(df.index.to_pydatetime()[0])))
columns_to_round = list(set(df.columns).difference(
set(["geocode", "epiweek"]))
)
columns_to_round = list(set(df.columns).difference(set(["geocode", "epiweek"])))
df[columns_to_round] = df[columns_to_round].map(lambda x: np.round(x, 4))
return df


def _geocode_ds(ds: xr.Dataset, geocode: int | str, locale: str, raw=False):
def _geocode_ds(
ds: xr.Dataset, geocode: int | str, locale: str, raw=False
) -> xr.Dataset:
"""
This is the most important method of the extension. It will
slice the dataset according to the geocode provided, do the
math and the parse of the units to Br's format, and reduce by
min, mean and max by day, if the `raw` is false.
Attrs:
geocode (str|int): Geocode of a Brazilian city according to IBGE.
geocode (str|int): Geocode of a city.
raw (bool) : If raw is set to True, the DataFrame returned
will contain data in 3 hours intervals. Default
return will aggregate these values into 24h
interval.
locale (str) : Country abbreviation. Example: 'BR'
Returns:
xr.Dataset: The final dataset with the data parsed into Br's
format. If not `raw`, will group the data by day,
Expand Down Expand Up @@ -230,7 +272,9 @@ def _geocode_ds(ds: xr.Dataset, geocode: int | str, locale: str, raw=False):
return final_ds


def _slice_dataset_by_coord(dataset: xr.Dataset, lats: list[int], lons: list[int]):
def _slice_dataset_by_coord(
dataset: xr.Dataset, lats: list[int], lons: list[int]
) -> xr.Dataset:
"""
Slices a dataset using latitudes and longitudes, returns a dataset
with the mean values between the coordinates.
Expand All @@ -241,8 +285,7 @@ def _slice_dataset_by_coord(dataset: xr.Dataset, lats: list[int], lons: list[int

def _convert_to_br_units(dataset: xr.Dataset) -> xr.Dataset:
"""
Parse the units according to Brazil's standard unit measures.
Rename their unit names and long names as well.
Parse measure units. Rename their unit names and long names as well.
"""
ds = dataset
vars = list(ds.data_vars.keys())
Expand Down Expand Up @@ -306,26 +349,24 @@ def _reduce_by(ds: xr.Dataset, func, prefix: str):
)


def _get_latlons(
geocode: int | str, locale: str
) -> tuple[list[float], list[float]]:
def _get_latlons(geocode: int | str, locale: str) -> tuple[list[float], list[float]]:
"""
Extract Latitude and Longitude from a Brazilian's city
according to IBGE's geocode format.
Extract Latitude and Longitude from a geocode of the specific locale.
"""
lat, lon = extract_latlons.from_geocode(int(geocode), locale)
N, S, E, W = extract_coordinates.from_latlon(lat, lon)

lats = [N, S]
lons = [E, W]

match geocode:
case 4108304: # Foz do Iguaçu
lats = [-25.5]
lons = [-54.5, -54.75]
if locale == "BR":
match geocode:
case 4108304: # Foz do Iguaçu - BR
lats = [-25.5]
lons = [-54.5, -54.75]

case 3548500: # Santos (SP)
lats = [-24.0]
lons = [-46.25, -46.5]
case 3548500: # Santos (SP) - BR
lats = [-24.0]
lons = [-46.25, -46.5]

return lats, lons
22 changes: 10 additions & 12 deletions satellite/weather/dsei.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import xarray as xr
import numpy as np
from loguru import logger
from matplotlib.path import Path # type: ignore
from shapely.geometry.polygon import Polygon # type: ignore
from matplotlib.path import Path
from shapely.geometry.polygon import Polygon

from .locales.BR import DSEI


@xr.register_dataset_accessor("DSEI")
Expand All @@ -22,22 +24,21 @@ class CopeDSEIDatasetExtension:
```
"""

DSEIs = brazil.DSEI.areas.DSEI_DF
DSEIs = DSEI.areas.DSEI_DF
_dsei_df = None

def __init__(self, xarray_ds: xr.Dataset) -> None:
self._ds = xarray_ds
self._grid = self.__do_grid()

def load_polygons(self):
df = brazil.DSEI.areas.load_polygons_df()
df = DSEI.areas.load_polygons_df()
self._dsei_df = df
logger.info("DSEI Polygons loaded")

def get_polygon(self, dsei: Union[str, int]) -> Polygon:
if self._dsei_df is None:
logger.error(
"Polygons are not loaded. Use `.DSEI.load_poligons()`")
logger.error("Polygons are not loaded. Use `.DSEI.load_poligons()`")
return None

polygon = self.__do_polygon(dsei)
Expand All @@ -48,11 +49,9 @@ def __getitem__(self, __dsei: Union[str, int] = None):
return self.__do_dataset(__dsei)
except AttributeError:
if self._dsei_df is None:
logger.error(
"Polygons are not loaded. Use `.DSEI.load_poligons()`")
logger.error("Polygons are not loaded. Use `.DSEI.load_poligons()`")
return None
logger.error(
f"{__dsei} not found. List all DSEIs with `.DSEI.info()`")
logger.error(f"{__dsei} not found. List all DSEIs with `.DSEI.info()`")
return None

def __do_grid(self):
Expand All @@ -64,8 +63,7 @@ def __do_grid(self):
def __do_polygon(self, __dsei: Union[str, int]) -> Polygon:
if isinstance(__dsei, str):
cod = float(self.DSEIs[self.DSEIs.DSEI == __dsei].code)
polygon = self._dsei_df[self._dsei_df.cod_dsei ==
cod].geometry.item()
polygon = self._dsei_df[self._dsei_df.cod_dsei == cod].geometry.item()
elif isinstance(__dsei, int):
polygon = self._dsei_df[
self._dsei_df.cod_dsei == float(__dsei)
Expand Down
1 change: 0 additions & 1 deletion satellite/weather/locales/BR/DSEI/__init__.py

This file was deleted.

3 changes: 0 additions & 3 deletions satellite/weather/locales/BR/__init__.py

This file was deleted.

1 change: 1 addition & 0 deletions satellite/weather/utils/extract_coordinates.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from_latlon(latitude, longitude) : Returns North, South, East and West given a
coordinate.
"""

from functools import lru_cache

import numpy as np
Expand Down
Loading

0 comments on commit a6be13a

Please sign in to comment.