Skip to content

Commit

Permalink
Merge pull request #10 from BengtRydberg/extend_eumetsat_api
Browse files Browse the repository at this point in the history
Extending the `EumetsatAPI` class
  • Loading branch information
pkhalaj authored Dec 15, 2024
2 parents da9cf72 + ef121bb commit 402bf99
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 23 deletions.
26 changes: 26 additions & 0 deletions examples/fetch_files/custom/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from datetime import datetime
from pathlib import Path

from monkey_wrench.query import EumetsatAPI, EumetsatCollection

output_directory = Path("<replace-with-directory-where-the-files-are-to-be-stored>")
start_datetime = datetime(2021, 1, 1)
end_datetime = datetime(2021, 1, 2)

# North, South, West, and East
bounding_box = [74.0, 54.0, 6.0, 26.0]

# The polygon vertices (lon, lat) of a small bounding box in central Sweden
geometry = [
(14.0, 64.0),
(16.0, 64.0),
(16.0, 62.0),
(14.0, 62.0),
(14.0, 64.0),
]

api = EumetsatAPI(EumetsatCollection.amsu)
results = api.query(start_datetime, end_datetime, polygon=geometry)
saved_files = api.fetch_products(results, output_directory, bounding_box=bounding_box)

print(f"Fetch {len(saved_files)} files and store them in {output_directory}.")
File renamed without changes.
File renamed without changes.
17 changes: 10 additions & 7 deletions src/monkey_wrench/date_time/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ def number_of_days_in_month(year: PositiveInt, month: PositiveInt) -> int:


@validate_call
def floor_datetime_minutes_to_snapshots(snapshots: list[Minutes], datetime_instance: datetime) -> datetime:
def floor_datetime_minutes_to_snapshots(
datetime_instance: datetime, snapshots: list[Minutes] | None = None
) -> datetime:
"""Round down or floor the given datetime to the closest minute from the snapshots.
Args:
snapshots:
A sorted list of minutes. As an example, for SEVIRI we have one snapshot per ``15`` minutes, starting
from the 12th minute. As a result, we have ``[12, 27, 42, 57]`` for SEVIRI snapshots in an hour.
datetime_instance:
The datetime instance to floor.
snapshots:
A sorted list of minutes. Defaults to ``None``, which means the given datetime instance will be returned as
it is, without any modifications. As an example, for SEVIRI we have one snapshot per ``15`` minutes,
starting from the 12th minute. As a result, we have ``[12, 27, 42, 57]`` for SEVIRI snapshots in an hour.
Returns:
A datetime instance which is smaller than or equal to ``datetime_instance``, such that the minute matches the
Expand All @@ -71,11 +74,11 @@ def floor_datetime_minutes_to_snapshots(snapshots: list[Minutes], datetime_insta
>>> from datetime import datetime
>>> from monkey_wrench.date_time import floor_datetime_minutes_to_snapshots
>>> seviri_snapshots = [12, 27, 42, 57]
>>> floor_datetime_minutes_to_snapshots(seviri_snapshots, datetime(2020, 1, 1, 0, 3))
>>> floor_datetime_minutes_to_snapshots(datetime(2020, 1, 1, 0, 3), seviri_snapshots)
datetime.datetime(2019, 12, 31, 23, 57)
>>> floor_datetime_minutes_to_snapshots(seviri_snapshots, datetime(2020, 1, 1, 0, 58))
>>> floor_datetime_minutes_to_snapshots(datetime(2020, 1, 1, 0, 58), seviri_snapshots)
datetime.datetime(2020, 1, 1, 0, 57)
>>> floor_datetime_minutes_to_snapshots(seviri_snapshots, datetime(2020, 1, 1, 1, 30))
>>> floor_datetime_minutes_to_snapshots(datetime(2020, 1, 1, 1, 30), seviri_snapshots)
datetime.datetime(2020, 1, 1, 1, 27)
"""
if not snapshots:
Expand Down
1 change: 1 addition & 0 deletions src/monkey_wrench/input_output/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

T = TypeVar("T", DirectoryPath, NewPath, FilePath)
AbsolutePath = Annotated[T, AfterValidator(lambda x: x.absolute())]
"""Type annotation and Pydantic validator to represent (convert to) an absolute path."""

Pattern = str | list[str] | None
"""Type alias for a string or a list of strings that will be used as a pattern to search in other strings."""
Expand Down
125 changes: 116 additions & 9 deletions src/monkey_wrench/query/_api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""The modules which defines the class for querying the EUMETSAT API."""

import fnmatch
import shutil
import time
from datetime import datetime, timedelta
from os import environ
from pathlib import Path
from typing import Any, ClassVar, Generator

from eumdac import AccessToken, DataStore
from eumdac import AccessToken, DataStore, DataTailor
from eumdac.collection import SearchResults
from eumdac.product import Product
from eumdac.tailor_models import Chain, RegionOfInterest
from fsspec import open_files
from loguru import logger
from pydantic import ConfigDict, validate_call
from pydantic import ConfigDict, PositiveInt, validate_call
from satpy.readers.utils import FSFile

from monkey_wrench.date_time import (
Expand All @@ -20,6 +26,12 @@
from ._common import Query
from ._meta import EumetsatAPIUrl, EumetsatCollection

BoundingBox = tuple[float, float, float, float]
"""The type alias for a tuple determining the bounds for (North, South, West, East)."""

Polygon = list[tuple[float, float]]
"""The """


class EumetsatAPI(Query):
"""A class with utilities to simplify querying all the product IDs from the EUMETSAT API.
Expand Down Expand Up @@ -68,9 +80,11 @@ def __init__(
.. _API key management: https://api.eumetsat.int/api-key
"""
super().__init__(log_context=log_context)
token = EumetsatAPI.get_token()
self.__collection = collection
self.__datastore = DataStore(EumetsatAPI.get_token())
self.__selected_collection = self.__datastore.get_collection(collection.value.query_string)
self.__data_store = DataStore(token)
self.__data_tailor = DataTailor(token)
self.__selected_collection = self.__data_store.get_collection(collection.value.query_string)

@classmethod
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
Expand Down Expand Up @@ -99,8 +113,22 @@ def len(self, product_ids: SearchResults) -> int:
"""Return the number of product IDs."""
return product_ids.total_results

@staticmethod
def __stringify_polygon(polygon: Polygon | None = None) -> str | None:
"""Convert the given polygon to a string representation which is expected by the API."""
if polygon is None:
return None

coordinates_str = ",".join([f"{lon} {lat}" for lon, lat in polygon])
return f"POLYGON(({coordinates_str}))"

@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def query(self, start_datetime: datetime, end_datetime: datetime) -> SearchResults:
def query(
self,
start_datetime: datetime,
end_datetime: datetime,
polygon: Polygon | None = None,
) -> SearchResults:
"""Query product IDs in a single batch.
This method wraps around the ``eumdac.Collection().search()`` method to perform a search for product IDs
Expand All @@ -120,6 +148,9 @@ def query(self, start_datetime: datetime, end_datetime: datetime) -> SearchResul
The start datetime (inclusive).
end_datetime:
The end datetime (exclusive).
polygon:
A list of polygon vertices. Each vertex is a 2-tuple of geodetic coordinates, i.e.
``(<longitude>, <latitude>)``.
Returns:
The results of the search, containing the product IDs found within the specified time range.
Expand All @@ -129,10 +160,9 @@ def query(self, start_datetime: datetime, end_datetime: datetime) -> SearchResul
Refer to :func:`~monkey_wrench.date_time.assert_start_time_is_before_end_time`.
"""
assert_start_time_is_before_end_time(start_datetime, end_datetime)
end_datetime = floor_datetime_minutes_to_snapshots(
self.__collection.seviri.value.snapshot_minutes, end_datetime
)
return self.__selected_collection.search(dtstart=start_datetime, dtend=end_datetime)
end_datetime = floor_datetime_minutes_to_snapshots(end_datetime, self.__collection.value.snapshot_minutes)
polygon = EumetsatAPI.__stringify_polygon(polygon)
return self.__selected_collection.search(dtstart=start_datetime, dtend=end_datetime, geo=polygon)

@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def query_in_batches(
Expand Down Expand Up @@ -190,6 +220,83 @@ def query_in_batches(
expected_total_count=expected_total_count
)

@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def fetch_products(
self,
search_results, # TODO: When adding `SearchResults` as the type, making the documentation fails!
output_directory: Path,
bounding_box: BoundingBox = (90., -90, -180., 180.),
output_file_format: str = "netcdf4",
sleep_time: PositiveInt = 10
) -> list[Path | None]:
"""Fetch all products of a search results and write product files to disk.
Args:
search_results:
Search results for which the files will be fetched.
output_directory:
The directory to save the files in.
bounding_box:
Bounding box, i.e. (north, south, west, east) limits.
output_file_format:
Desired format of the output file(s). Defaults to ``netcdf4``.
sleep_time:
Sleep time, in seconds, between requests. Defaults to ``10`` seconds.
Returns:
A list paths for the fetched files.
"""
if not output_directory.exists():
output_directory.mkdir(parents=True, exist_ok=True)

chain = Chain(
product=search_results.collection.product_type,
format=output_file_format,
roi=RegionOfInterest(NSWE=bounding_box)
)
return [self.fetch_product(product, chain, output_directory, sleep_time) for product in search_results]

def fetch_product(
self,
product: Product,
chain: Chain,
output_directory: Path,
sleep_time: PositiveInt
) -> Path | None:
"""Fetch the file for a single product and write the product file to disk.
Args:
product:
The Product whose corresponding file will be fetched.
chain:
Chain to apply for customization of the output file.
output_directory:
The directory to save the file in.
sleep_time:
Sleep time, in seconds, between requests.
Returns:
The path of the saved file on the disk, Otherwise ``None`` in case of a failure.
"""
customisation = self.__data_tailor.new_customisation(product, chain)
logger.info(f"Start downloading product {str(product)}")
while True:
if "DONE" in customisation.status:
customized_file = fnmatch.filter(customisation.outputs, "*.nc")[0]
with (
customisation.stream_output(customized_file) as stream,
open(output_directory / stream.name, mode="wb") as fdst
):
shutil.copyfileobj(stream, fdst)
logger.info(f"Wrote file: {fdst.name}' to disk.")
return Path(output_directory / stream.name)
elif customisation.status in ["ERROR", "FAILED", "DELETED", "KILLED", "INACTIVE"]:
logger.warning(f"Job failed, error code is: '{customisation.status.lower()}'.")
return None
elif customisation.status in ["QUEUED", "RUNNING"]:
logger.info(f"Job is {customisation.status.lower()}.")
time.sleep(sleep_time)

@staticmethod
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def open_seviri_native_file_remotely(product_id: str, cache: str | None = None) -> FSFile:
Expand Down
5 changes: 4 additions & 1 deletion src/monkey_wrench/query/_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
class CollectionMeta(NamedTuple):
"""Named tuple to gather the collection metadata."""
query_string: str
snapshot_minutes: list[int]
snapshot_minutes: list[int] | None = None


class EumetsatCollection(Enum):
"""Enum class that defines the collections for the EUMETSAT datastore."""
amsu = CollectionMeta(query_string="EO:EUM:DAT:METOP:AMSUL1")
avhrr = CollectionMeta(query_string="EO:EUM:DAT:METOP:AVHRRL1")
mhs = CollectionMeta(query_string="EO:EUM:DAT:METOP:MHSL1")
seviri = CollectionMeta(query_string="EO:EUM:DAT:MSG:HRSEVIRI", snapshot_minutes=[12, 27, 42, 57])


Expand Down
7 changes: 4 additions & 3 deletions src/tests/date_time/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ def test_days_in_a_month(year, month, number_of_days):
([2022, 1, 1, 0, 1], [12, 27, 42, 57], [2021, 12, 31, 23, 57]),
([2022, 3, 4, 10, 41], [12, 27, 42, 57], [2022, 3, 4, 10, 27]),
([2022, 1, 1, 1, 59], [12, 27, 42, 57], [2022, 1, 1, 1, 57]),
([2022, 1, 1, 1, 59], [], [2022, 1, 1, 1, 59])
([2022, 1, 1, 1, 59], [], [2022, 1, 1, 1, 59]),
([2022, 1, 1, 1, 59], None, [2022, 1, 1, 1, 59])
])
def test_floor_datetime_minutes(instance, snapshots, res):
assert datetime(*res) == floor_datetime_minutes_to_snapshots(snapshots, datetime(*instance))
assert datetime(*res) == floor_datetime_minutes_to_snapshots(datetime(*instance), snapshots)


@pytest.mark.parametrize(("snapshots", "error_message"), [
Expand All @@ -54,4 +55,4 @@ def test_floor_datetime_minutes(instance, snapshots, res):
])
def test_floor_datetime_minutes_raise(snapshots, error_message):
with pytest.raises(ValueError, match=error_message):
floor_datetime_minutes_to_snapshots(snapshots, datetime(2022, 1, 1))
floor_datetime_minutes_to_snapshots(datetime(2022, 1, 1), snapshots)
41 changes: 38 additions & 3 deletions src/tests/query/test_api.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
from datetime import datetime, timedelta

import pytest
from eumdac.collection import SearchResults
from eumdac.product import Product

from monkey_wrench.date_time import SeviriIDParser
from monkey_wrench.query import EumetsatAPI, EumetsatCollection
from monkey_wrench.test_utils import (
EnvironmentVariables,
)
from monkey_wrench.test_utils import EnvironmentVariables


@pytest.fixture
def api(get_token_or_skip) -> EumetsatAPI:
"""Get eumetsat api."""
return EumetsatAPI(EumetsatCollection.amsu)


@pytest.fixture
def search_results(api: EumetsatAPI) -> SearchResults:
"""Get search results."""
start = datetime(2021, 1, 1, 0)
end = datetime(2021, 1, 1, 6)
geometry = [ # polygon vertices (lon, lat) of small bounding box in central Sweden
(14.0, 64.0),
(16.0, 64.0),
(16.0, 62.0),
(14.0, 62.0),
(14.0, 64.0),
]
return api.query(start, end, polygon=geometry)


def test_api_init_raise():
Expand Down Expand Up @@ -48,6 +68,21 @@ def test_api_query_in_batches(get_token_or_skip):
assert 0 == day


def test_fetch_fails(api, search_results, tmp_path):
nswe_bbox = [64, 62, 114, 116] # bbox outside the one used for the search query
outfiles = api.fetch_products(search_results, tmp_path, bounding_box=nswe_bbox, sleep_time=1)
assert len(outfiles) == 1
assert outfiles[0] is None


def test_fetch(api, search_results, tmp_path):
nswe_bbox = [70, 60, 10, 20]
outfiles = api.fetch_products(search_results, tmp_path, bounding_box=nswe_bbox, sleep_time=1)
assert len(outfiles) == 1
assert outfiles[0].is_file()
assert outfiles[0].suffix == ".nc"


def seviri_product_datetime_is_correct(day: int, product: Product, end_datetime: datetime, start_datetime: datetime):
"""Check that the product datetime is correct."""
datetime_obj = SeviriIDParser.parse(str(product))
Expand Down

0 comments on commit 402bf99

Please sign in to comment.