From 20fad42e11e0bfd9675402a5de7ae702d578ecc3 Mon Sep 17 00:00:00 2001 From: garciam Date: Fri, 23 Feb 2024 14:22:40 +0100 Subject: [PATCH 1/2] keep the session open only when we need it, closing it before the data is retrieved. --- cdsobs/cli/_retrieve.py | 16 +++++++--------- cdsobs/forms_jsons.py | 6 +----- cdsobs/retrieve/api.py | 21 +++++++++++---------- cdsobs/sanity_checks.py | 2 +- tests/cli/test_app.py | 4 +--- tests/retrieve/test_api.py | 10 +++++----- tests/system/1_year_benchmarks.py | 7 ++++--- 7 files changed, 30 insertions(+), 36 deletions(-) diff --git a/cdsobs/cli/_retrieve.py b/cdsobs/cli/_retrieve.py index 6900f96..cf0ea8f 100644 --- a/cdsobs/cli/_retrieve.py +++ b/cdsobs/cli/_retrieve.py @@ -7,7 +7,6 @@ from cdsobs.cli._utils import CliException, ConfigNotFound, config_yml_typer from cdsobs.config import validate_config -from cdsobs.observation_catalogue.database import get_session from cdsobs.retrieve.api import retrieve_observations from cdsobs.retrieve.models import RetrieveArgs from cdsobs.storage import S3Client @@ -61,13 +60,12 @@ def retrieve( raise ConfigNotFound() config = validate_config(cdsobs_config_yml) s3_client = S3Client.from_config(config.s3config) - with get_session(config.catalogue_db) as session: - output_file = retrieve_observations( - session, - s3_client.public_url_base, - retrieve_args, - output_dir, - size_limit, - ) + output_file = retrieve_observations( + config.catalogue_db.get_url(), + s3_client.public_url_base, + retrieve_args, + output_dir, + size_limit, + ) console = Console() console.print(f"[green] Successfully downloaded {output_file} [/green]") diff --git a/cdsobs/forms_jsons.py b/cdsobs/forms_jsons.py index 0ccfceb..51b560c 100644 --- a/cdsobs/forms_jsons.py +++ b/cdsobs/forms_jsons.py @@ -78,11 +78,7 @@ def get_variables_json(dataset: str, output_path: Path) -> Path: def get_constraints_json(session, output_path: Path, dataset) -> Path: - """ - JSON file with the constraints in compressed form. - - Beware this in the need of some optimization (may be resource heavy). - """ + """JSON file with the constraints in compressed form.""" # This is probably slow, can it be improved? catalogue_entries = get_catalogue_entries_stream(session, dataset) merged_constraints = merged_constraints_table(catalogue_entries) diff --git a/cdsobs/retrieve/api.py b/cdsobs/retrieve/api.py index 05590ff..3127f92 100644 --- a/cdsobs/retrieve/api.py +++ b/cdsobs/retrieve/api.py @@ -13,7 +13,6 @@ import pandas import xarray from fsspec.implementations.http import HTTPFileSystem -from sqlalchemy.orm import Session from cdsobs.cdm.lite import cdm_lite_variables from cdsobs.constants import TIME_UNITS_REFERENCE_DATE @@ -27,14 +26,14 @@ from cdsobs.retrieve.retrieve_services import estimate_data_size, ezclump from cdsobs.service_definition.api import get_service_definition from cdsobs.utils.logutils import SizeError, get_logger -from cdsobs.utils.utils import get_code_mapping +from cdsobs.utils.utils import get_code_mapping, get_database_session logger = get_logger(__name__) MAX_NUMBER_OF_GROUPS = 10 def retrieve_observations( - session: Session, + catalogue_url: str, storage_url: str, retrieve_args: RetrieveArgs, output_dir: Path, @@ -45,8 +44,9 @@ def retrieve_observations( Parameters ---------- - session: - Session in the catalogue database + catalogue_url: + URL of the catalogue database including credentials, in the form of + "postgresql+psycopg2://someuser:somepass@hostname:port/catalogue" storage_url: Storage URL retrieve_args : @@ -58,11 +58,12 @@ def retrieve_observations( """ logger.info("Starting retrieve pipeline.") # Query the storage to get the URLS of the files that contain the data requested - catalogue_repository = CatalogueRepository(session) - entries = _get_catalogue_entries(catalogue_repository, retrieve_args) - object_urls = _get_urls_and_check_size( - entries, retrieve_args, size_limit, storage_url - ) + with get_database_session(catalogue_url) as session: + catalogue_repository = CatalogueRepository(session) + entries = _get_catalogue_entries(catalogue_repository, retrieve_args) + object_urls = _get_urls_and_check_size( + entries, retrieve_args, size_limit, storage_url + ) # Get the path of the output file output_path_netcdf = _get_output_path(output_dir, retrieve_args.dataset, "netCDF") # First we always write the netCDF-lite file diff --git a/cdsobs/sanity_checks.py b/cdsobs/sanity_checks.py index d4bd89d..5d06269 100644 --- a/cdsobs/sanity_checks.py +++ b/cdsobs/sanity_checks.py @@ -71,7 +71,7 @@ def _sanity_check_dataset( check_if_missing_in_object_storage(catalogue_repo, s3_client, dataset_name) # Retrieve and check output output_path = retrieve_observations( - session, + config.catalogue_db.get_url(), s3_client.public_url_base, retrieve_args, Path(tmpdir), diff --git a/tests/cli/test_app.py b/tests/cli/test_app.py index 4e1e598..f809039 100644 --- a/tests/cli/test_app.py +++ b/tests/cli/test_app.py @@ -31,7 +31,7 @@ def test_cli_make_production(verbose): assert result.exit_code == 0 -@pytest.mark.skip(reason="this test does not reset db after running") +# @pytest.mark.skip(reason="this test does not reset db after running") def test_cli_retrieve(tmp_path, test_repository): runner = CliRunner() test_json_str = """[ @@ -61,8 +61,6 @@ def test_cli_retrieve(tmp_path, test_repository): CONFIG_YML, "--output-dir", str(tmp_path), - "--np", - "2", ] result = runner.invoke( app, diff --git a/tests/retrieve/test_api.py b/tests/retrieve/test_api.py index 5eb0c33..195b2d0 100644 --- a/tests/retrieve/test_api.py +++ b/tests/retrieve/test_api.py @@ -7,7 +7,6 @@ from cdsobs.config import CDSObsConfig from cdsobs.constants import CONFIG_YML -from cdsobs.observation_catalogue.database import get_session from cdsobs.retrieve.api import retrieve_observations from cdsobs.retrieve.models import RetrieveArgs from cdsobs.storage import S3Client @@ -23,7 +22,9 @@ @pytest.mark.parametrize("oformat,dataset_source,time_coverage", PARAMETRIZE_VALUES) -def test_retrieve(test_repository, tmp_path, oformat, dataset_source, time_coverage): +def test_retrieve( + test_repository, test_config, tmp_path, oformat, dataset_source, time_coverage +): dataset_name = "insitu-observations-woudc-ozone-total-column-and-profiles" start_year, end_year = get_test_years(dataset_source) if dataset_source == "OzoneSonde": @@ -52,7 +53,7 @@ def test_retrieve(test_repository, tmp_path, oformat, dataset_source, time_cover retrieve_args = RetrieveArgs(dataset=dataset_name, params=params) start = datetime.now() output_file = retrieve_observations( - test_repository.catalogue_repository.session, + test_config.catalogue_db.get_url(), test_repository.s3_client.base, retrieve_args, tmp_path, @@ -86,10 +87,9 @@ def test_retrieve_cuon(): ], } retrieve_args = RetrieveArgs(dataset=dataset_name, params=params) - session = get_session(test_config.catalogue_db) s3_client = S3Client.from_config(test_config.s3config) output_file = retrieve_observations( - session, + test_config.catalogue_db.get_url(), s3_client.base, retrieve_args, Path("/tmp"), diff --git a/tests/system/1_year_benchmarks.py b/tests/system/1_year_benchmarks.py index 5602e06..e21a7a8 100644 --- a/tests/system/1_year_benchmarks.py +++ b/tests/system/1_year_benchmarks.py @@ -95,8 +95,9 @@ def main(): retrieve_args = RetrieveArgs(dataset=dataset_name, params=params) s3_client = S3Client.from_config(config.s3config) start_time = time.perf_counter() + catalogue_url = config.catalogue_db.get_url() retrieve_funct( - session, + catalogue_url, s3_client.public_url_base, retrieve_args, tmpdir, @@ -127,7 +128,7 @@ def main(): retrieve_args = RetrieveArgs(dataset=dataset_name, params=params) start_time = time.perf_counter() retrieve_funct( - session, + catalogue_url, s3_client.public_url_base, retrieve_args, tmpdir, @@ -151,7 +152,7 @@ def main(): retrieve_args = RetrieveArgs(dataset=dataset_name, params=params) start_time = time.perf_counter() retrieve_funct( - session, + catalogue_url, s3_client.public_url_base, retrieve_args, tmpdir, From c9153679b491fffce263cc06ee5b54daf6966929 Mon Sep 17 00:00:00 2001 From: garciam Date: Fri, 23 Feb 2024 15:08:46 +0100 Subject: [PATCH 2/2] fixed json print format --- cdsobs/cli/_catalogue_explorer.py | 6 +++--- tests/cli/test_catalogue_explorer.py | 11 +++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/cdsobs/cli/_catalogue_explorer.py b/cdsobs/cli/_catalogue_explorer.py index 39824ed..5246510 100644 --- a/cdsobs/cli/_catalogue_explorer.py +++ b/cdsobs/cli/_catalogue_explorer.py @@ -94,10 +94,10 @@ def list_catalogue( # with pagination (50 per page) results = list_catalogue_(session, filters, page) - if len(results) == 0: - raise RuntimeError("No catalogue entries found for these parameters.") + if len(results) == 0: + raise RuntimeError("No catalogue entries found for these parameters.") - print_db_results(results, print_format) + print_db_results(results, print_format) def list_catalogue_( diff --git a/tests/cli/test_catalogue_explorer.py b/tests/cli/test_catalogue_explorer.py index 8219d37..14ef675 100644 --- a/tests/cli/test_catalogue_explorer.py +++ b/tests/cli/test_catalogue_explorer.py @@ -1,3 +1,4 @@ +import pytest from typer.testing import CliRunner from cdsobs.cli._catalogue_explorer import list_catalogue_ @@ -8,10 +9,11 @@ runner = CliRunner() -def test_list_catalogue(test_session, test_repository): +@pytest.mark.parametrize("print_format", ["table", "json"]) +def test_list_catalogue(test_session, test_repository, print_format): result = runner.invoke( app, - ["list-catalogue", "-c", CONFIG_YML], + ["list-catalogue", "-c", CONFIG_YML, "--print-format", print_format], catch_exceptions=False, ) assert result.exit_code == 0 @@ -26,10 +28,11 @@ def test_catalogue_dataset_info(test_session, test_repository): assert result.exit_code == 0 -def test_list_datasets(): +@pytest.mark.parametrize("print_format", ["table", "json"]) +def test_list_datasets(print_format): result = runner.invoke( app, - ["list-datasets", "-c", CONFIG_YML, "--print-format", "json"], + ["list-datasets", "-c", CONFIG_YML, "--print-format", print_format], catch_exceptions=False, ) assert result.exit_code == 0