diff --git a/satellite/weather/__init__.py b/satellite/weather/__init__.py index 7e6ad74..c70c770 100644 --- a/satellite/weather/__init__.py +++ b/satellite/weather/__init__.py @@ -4,6 +4,7 @@ import xarray as xr from .copebr import * # noqa + # from .dsei import * # noqa diff --git a/satellite/weather/copebr.py b/satellite/weather/copebr.py index 7ae0c44..3725200 100644 --- a/satellite/weather/copebr.py +++ b/satellite/weather/copebr.py @@ -1,45 +1,38 @@ -from typing import Union, Literal +from abc import ABC, abstractmethod +from typing import Union -from epiweeks import Week +import pandas as pd import dask -import dask.array as da # type: ignore -import dask.dataframe as dd # type: ignore -import numpy as np # type: ignore -import xarray as xr # type: ignore -from loguru import logger # type: ignore -from sqlalchemy.engine import Connectable # type: ignore - -# from .locales import BR +import dask.array as da +import dask.dataframe as dd +import numpy as np +import xarray as xr +from loguru import logger +from epiweeks import Week +from sqlalchemy.engine import Connectable + from .utils import extract_latlons, extract_coordinates xr.set_options(keep_attrs=True) -@xr.register_dataset_accessor("copebr") -class CopeBRDatasetExtension: +class CopeDatasetExtensionBase(ABC): """ - xarray.Dataset.copebr - --------------------- - - This class is an `xr.Dataset` extension. It works as a dataset - layer with the purpose of enhancing the dataset with new methods. - The expect input dataset is an `netCDF4` file from Copernicus API; - this extension will work on certain data variables, the method that - extracts with the correct parameters can be found in `extract_reanalysis` - module. + This class is an `xr.Dataset` extension base class. It's children will + works as a dataset layer with the purpose of enhancing the xarray dataset + with new methods. The expect input dataset is an `netCDF4` file from + Copernicus API; this extension will work on certain data variables, + the method that extracts with the correct parameters can be found in + `extract_reanalysis` module. Usage: + ``` + import satellite.weather as sat + ds = sat.load_dataset('file/path') + ds.Cope.to_dataframe(geocode) + ds.Cope.geocode_ds(geocode) + ``` - ``` - import satellite.weather as sat - ds = sat.load_dataset('file/path') - RJ_geocode = 3304557 - rio_df = ds.copebr.to_dataframe(RJ_geocode) - rio_ds = ds.copebr.ds_from_geocode(RJ_geocode) - ``` - - The original dataset will be parsed into Brazilian's data format and can - be sliced by a Geocode from any City in Brazil, according to IBGE geocodes. The expect output when the requested data is not `raw` is: date : datetime object. @@ -59,38 +52,48 @@ class CopeBRDatasetExtension: umid_max : Maximum┘ """ - def __init__( - self, - xarray_ds: xr.Dataset, - locale: Literal['BR', 'AR'] = 'BR' - ) -> None: + @abstractmethod + def geocode_ds(self, geocode, raw, **kwargs) -> xr.Dataset: + pass + + @abstractmethod + def to_dataframe(self, geocodes, raw, **kwargs) -> pd.DataFrame: + pass + + @abstractmethod + def to_sql(self, geocodes, con, tablename, schema, raw, **kwargs) -> None: + """ + Reads the data for each geocode and insert the rows into the + database one by one, created by sqlalchemy engine with the URI. + This method is convenient to prevent the memory overhead when + executing with a large amount of geocodes. + """ + pass + + +@xr.register_dataset_accessor("CopeBR") +class CopeBRDatasetExtension: + def __init__(self, xarray_ds: xr.Dataset): self._ds = xarray_ds - self.locale = locale + self.locale = "BR" - def to_dataframe(self, geocodes: Union[list, int], raw: bool = False): + def to_dataframe(self, geocodes: Union[list[int], int], raw: bool = False): df = _final_dataframe(dataset=self._ds, geocodes=geocodes, raw=raw) - if isinstance(df, dask.dataframe.DataFrame): df = df.compute() - df = df.reset_index(drop=True) return df def to_sql( self, - geocodes: Union[list, int], + geocodes: Union[list[int], int], con: Connectable, tablename: str, schema: str, raw: bool = False, + verbose: bool = True, ) -> None: - """ - Reads the data for each geocode and insert the rows into the - database one by one, created by sqlalchemy engine with the URI. - This method is convenient to prevent the memory overhead when - executing with a large amount of geocodes. - """ geocodes = [geocodes] if isinstance(geocodes, int) else geocodes for geocode in geocodes: _geocode_to_sql( @@ -101,17 +104,55 @@ def to_sql( tablename=tablename, raw=raw, ) - logger.debug(f"{geocode} updated on {schema}.{tablename}") + if verbose: + logger.info(f"{geocode} updated on {schema}.{tablename}") - def geocode_ds(self, geocode: int | str, raw: bool = False): + def geocode_ds(self, geocode: int, raw: bool = False): + return _geocode_ds(self._ds, geocode, self.locale, raw) + + +@xr.register_dataset_accessor("CopeAR") +class CopeARDatasetExtension: + def __init__(self, xarray_ds: xr.Dataset): + self._ds = xarray_ds + self.locale = "AR" + + def to_dataframe(self, geocodes: Union[list[str], str], raw: bool = False): + df = _final_dataframe(dataset=self._ds, geocodes=geocodes, raw=raw) + if isinstance(df, dask.dataframe.DataFrame): + df = df.compute() + df = df.reset_index(drop=True) + return df + + def to_sql( + self, + geocodes: Union[list[str], str], + con: Connectable, + tablename: str, + schema: str, + raw: bool = False, + verbose: bool = True, + ): + geocodes = [geocodes] if isinstance(geocodes, int) else geocodes + for geocode in geocodes: + _geocode_to_sql( + dataset=self._ds, + geocode=geocode, + con=con, + schema=schema, + tablename=tablename, + raw=raw, + ) + if verbose: + logger.info(f"{geocode} updated on {schema}.{tablename}") + + def geocode_ds(self, geocode: str, raw: bool = False): return _geocode_ds(self._ds, geocode, self.locale, raw) def _final_dataframe( - dataset: xr.Dataset, - geocodes: Union[list, int], - raw=False -): + dataset: xr.Dataset, geocodes: Union[list[str | int], int | str], raw=False +) -> pd.DataFrame: geocodes = [geocodes] if isinstance(geocodes, int) else geocodes dfs = [] @@ -138,7 +179,7 @@ def _geocode_to_sql( schema: str, tablename: str, raw: bool, -): +) -> None: df = _geocode_to_dataframe(dataset=dataset, geocode=geocode, raw=raw) df = df.reset_index(drop=False) if raw: @@ -156,14 +197,14 @@ def _geocode_to_sql( del df -def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False): +def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False) -> pd.DataFrame: """ Returns a DataFrame with the values related to the geocode of a - brazilian city according to IBGE's format. Extract the values + city according to each country's standard. Extract the values using `ds_from_geocode()` and return `xr.Dataset.to_dataframe()` from Xarray, inserting the geocode into the final DataFrame. Attrs: - geocode (str or int): Geocode of a city in Brazil according to IBGE. + geocode (str or int): Geocode of a city raw (bool) : If raw is set to True, the DataFrame returned will contain data in 3 hours intervals. Default return will aggregate these values @@ -179,25 +220,26 @@ def _geocode_to_dataframe(dataset: xr.Dataset, geocode: int, raw=False): geocode = [geocode for g in range(len(df))] df = df.assign(geocode=da.from_array(geocode)) df = df.assign(epiweek=str(Week.fromdate(df.index.to_pydatetime()[0]))) - columns_to_round = list(set(df.columns).difference( - set(["geocode", "epiweek"])) - ) + columns_to_round = list(set(df.columns).difference(set(["geocode", "epiweek"]))) df[columns_to_round] = df[columns_to_round].map(lambda x: np.round(x, 4)) return df -def _geocode_ds(ds: xr.Dataset, geocode: int | str, locale: str, raw=False): +def _geocode_ds( + ds: xr.Dataset, geocode: int | str, locale: str, raw=False +) -> xr.Dataset: """ This is the most important method of the extension. It will slice the dataset according to the geocode provided, do the math and the parse of the units to Br's format, and reduce by min, mean and max by day, if the `raw` is false. Attrs: - geocode (str|int): Geocode of a Brazilian city according to IBGE. + geocode (str|int): Geocode of a city. raw (bool) : If raw is set to True, the DataFrame returned will contain data in 3 hours intervals. Default return will aggregate these values into 24h interval. + locale (str) : Country abbreviation. Example: 'BR' Returns: xr.Dataset: The final dataset with the data parsed into Br's format. If not `raw`, will group the data by day, @@ -230,7 +272,9 @@ def _geocode_ds(ds: xr.Dataset, geocode: int | str, locale: str, raw=False): return final_ds -def _slice_dataset_by_coord(dataset: xr.Dataset, lats: list[int], lons: list[int]): +def _slice_dataset_by_coord( + dataset: xr.Dataset, lats: list[int], lons: list[int] +) -> xr.Dataset: """ Slices a dataset using latitudes and longitudes, returns a dataset with the mean values between the coordinates. @@ -241,8 +285,7 @@ def _slice_dataset_by_coord(dataset: xr.Dataset, lats: list[int], lons: list[int def _convert_to_br_units(dataset: xr.Dataset) -> xr.Dataset: """ - Parse the units according to Brazil's standard unit measures. - Rename their unit names and long names as well. + Parse measure units. Rename their unit names and long names as well. """ ds = dataset vars = list(ds.data_vars.keys()) @@ -306,12 +349,9 @@ def _reduce_by(ds: xr.Dataset, func, prefix: str): ) -def _get_latlons( - geocode: int | str, locale: str -) -> tuple[list[float], list[float]]: +def _get_latlons(geocode: int | str, locale: str) -> tuple[list[float], list[float]]: """ - Extract Latitude and Longitude from a Brazilian's city - according to IBGE's geocode format. + Extract Latitude and Longitude from a geocode of the specific locale. """ lat, lon = extract_latlons.from_geocode(int(geocode), locale) N, S, E, W = extract_coordinates.from_latlon(lat, lon) @@ -319,13 +359,14 @@ def _get_latlons( lats = [N, S] lons = [E, W] - match geocode: - case 4108304: # Foz do Iguaçu - lats = [-25.5] - lons = [-54.5, -54.75] + if locale == "BR": + match geocode: + case 4108304: # Foz do Iguaçu - BR + lats = [-25.5] + lons = [-54.5, -54.75] - case 3548500: # Santos (SP) - lats = [-24.0] - lons = [-46.25, -46.5] + case 3548500: # Santos (SP) - BR + lats = [-24.0] + lons = [-46.25, -46.5] return lats, lons diff --git a/satellite/weather/dsei.py b/satellite/weather/dsei.py index 10b303b..5160244 100644 --- a/satellite/weather/dsei.py +++ b/satellite/weather/dsei.py @@ -3,8 +3,10 @@ import xarray as xr import numpy as np from loguru import logger -from matplotlib.path import Path # type: ignore -from shapely.geometry.polygon import Polygon # type: ignore +from matplotlib.path import Path +from shapely.geometry.polygon import Polygon + +from .locales.BR import DSEI @xr.register_dataset_accessor("DSEI") @@ -22,7 +24,7 @@ class CopeDSEIDatasetExtension: ``` """ - DSEIs = brazil.DSEI.areas.DSEI_DF + DSEIs = DSEI.areas.DSEI_DF _dsei_df = None def __init__(self, xarray_ds: xr.Dataset) -> None: @@ -30,14 +32,13 @@ def __init__(self, xarray_ds: xr.Dataset) -> None: self._grid = self.__do_grid() def load_polygons(self): - df = brazil.DSEI.areas.load_polygons_df() + df = DSEI.areas.load_polygons_df() self._dsei_df = df logger.info("DSEI Polygons loaded") def get_polygon(self, dsei: Union[str, int]) -> Polygon: if self._dsei_df is None: - logger.error( - "Polygons are not loaded. Use `.DSEI.load_poligons()`") + logger.error("Polygons are not loaded. Use `.DSEI.load_poligons()`") return None polygon = self.__do_polygon(dsei) @@ -48,11 +49,9 @@ def __getitem__(self, __dsei: Union[str, int] = None): return self.__do_dataset(__dsei) except AttributeError: if self._dsei_df is None: - logger.error( - "Polygons are not loaded. Use `.DSEI.load_poligons()`") + logger.error("Polygons are not loaded. Use `.DSEI.load_poligons()`") return None - logger.error( - f"{__dsei} not found. List all DSEIs with `.DSEI.info()`") + logger.error(f"{__dsei} not found. List all DSEIs with `.DSEI.info()`") return None def __do_grid(self): @@ -64,8 +63,7 @@ def __do_grid(self): def __do_polygon(self, __dsei: Union[str, int]) -> Polygon: if isinstance(__dsei, str): cod = float(self.DSEIs[self.DSEIs.DSEI == __dsei].code) - polygon = self._dsei_df[self._dsei_df.cod_dsei == - cod].geometry.item() + polygon = self._dsei_df[self._dsei_df.cod_dsei == cod].geometry.item() elif isinstance(__dsei, int): polygon = self._dsei_df[ self._dsei_df.cod_dsei == float(__dsei) diff --git a/satellite/weather/locales/BR/DSEI/__init__.py b/satellite/weather/locales/BR/DSEI/__init__.py deleted file mode 100644 index 19fd01c..0000000 --- a/satellite/weather/locales/BR/DSEI/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from . import areas # noqa diff --git a/satellite/weather/locales/BR/__init__.py b/satellite/weather/locales/BR/__init__.py deleted file mode 100644 index 7f4fb72..0000000 --- a/satellite/weather/locales/BR/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from . import DSEI, extract_coordinates, extract_latlons # noqa - -# TODO: Docstrings diff --git a/satellite/weather/utils/extract_coordinates.py b/satellite/weather/utils/extract_coordinates.py index 3a96f8d..9bd8de4 100644 --- a/satellite/weather/utils/extract_coordinates.py +++ b/satellite/weather/utils/extract_coordinates.py @@ -12,6 +12,7 @@ from_latlon(latitude, longitude) : Returns North, South, East and West given a coordinate. """ + from functools import lru_cache import numpy as np diff --git a/satellite/weather/utils/extract_latlons.py b/satellite/weather/utils/extract_latlons.py index df1b583..5cacd24 100644 --- a/satellite/weather/utils/extract_latlons.py +++ b/satellite/weather/utils/extract_latlons.py @@ -21,9 +21,7 @@ @lru_cache def _read_locale_json(locale: str) -> pd.DataFrame: - return pd.read_json( - f"{Path(__file__).parent.parent}/locales/{locale}/muns.json" - ) + return pd.read_json(f"{Path(__file__).parent.parent}/locales/{locale}/muns.json") def from_geocode(geocode: int | str, locale: str) -> tuple: @@ -36,7 +34,7 @@ def from_geocode(geocode: int | str, locale: str) -> tuple: locale (str) : Country abbreviation. Example: 'BR' Returns: - lat (float) : Latitude of geocode in degrees between -90 and 90. + lat (float) : Latitude of geocode in degrees between -90 and 90. Represents the North and South coordinates. lon (float) : Longitude of geocode in degrees @@ -52,6 +50,6 @@ def from_geocode(geocode: int | str, locale: str) -> tuple: if lat_lon.empty: raise ValueError(f"Geocode {geocode} not found") - lat, lon = lat_lon['latitude'], lat_lon['longitude'] + lat, lon = lat_lon["latitude"], lat_lon["longitude"] return lat, lon