From 94b98f6b605c588e73af251ec629635fc046a4dc Mon Sep 17 00:00:00 2001 From: Alessio Siniscalchi Date: Thu, 30 Jan 2025 16:32:02 +0100 Subject: [PATCH] create multiple buckets at init (#145) * create multiple buckets at init * removed dependency of cads_worker from entry_points * kill job on workers * killed job * qa * Enhance job termination process on workers to handle multiple processes and improve error logging * Refactor logging in job termination process for improved readability * Refactor logging in job termination process for improved readability * improve log message --------- Co-authored-by: Francesco Nazzaro --- cads_broker/dispatcher.py | 3 ++- cads_broker/entry_points.py | 12 +++++++----- cads_broker/object_storage.py | 23 +++++++++++++++++++++-- tests/test_90_entry_points.py | 15 +++++++++++---- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/cads_broker/dispatcher.py b/cads_broker/dispatcher.py index f2822e05..e9e60e56 100644 --- a/cads_broker/dispatcher.py +++ b/cads_broker/dispatcher.py @@ -113,7 +113,7 @@ def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None: ) except (KeyError, NameError): logger.warning( - "worker not found while killing a job", job_id=request_uid, pid=pid, worker_ip=worker_ip + "worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip ) @@ -450,6 +450,7 @@ def sync_database(self, session: sa.orm.Session) -> None: # try to cancel the job directly on the scheduler cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid]) kill_job_on_worker(self.client, request.request_uid) + kill_job_on_worker(self.client, request.request_uid) session = self.manage_dismissed_request(request, session) session.commit() diff --git a/cads_broker/entry_points.py b/cads_broker/entry_points.py index 83436ba8..d6cd6b63 100644 --- a/cads_broker/entry_points.py +++ b/cads_broker/entry_points.py @@ -7,7 +7,7 @@ import time import uuid from pathlib import Path -from typing import Any, Optional +from typing import Any, List, Optional import prettytable import sqlalchemy as sa @@ -316,10 +316,12 @@ def init_db(connection_string: Optional[str] = None, force: bool = False) -> Non "aws_access_key_id": os.environ["STORAGE_ADMIN"], "aws_secret_access_key": os.environ["STORAGE_PASSWORD"], } - object_storage.create_download_bucket( - os.environ.get("CACHE_BUCKET", "cache"), object_storage_url, **storage_kws - ) - print("successfully created the cache area in the object storage.") + download_buckets: List[str] = object_storage.parse_data_volumes_config() + for download_bucket in download_buckets: + object_storage.create_download_bucket( + download_bucket, object_storage_url, **storage_kws + ) + print("successfully created the cache areas in the object storage.") @app.command() diff --git a/cads_broker/object_storage.py b/cads_broker/object_storage.py index 74a601d5..36717955 100644 --- a/cads_broker/object_storage.py +++ b/cads_broker/object_storage.py @@ -1,5 +1,7 @@ """utility module to interface to the object storage.""" +import os.path +import urllib.parse from typing import Any import boto3 # type: ignore @@ -9,6 +11,18 @@ logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__) +def parse_data_volumes_config(path: str | None = None) -> list[str]: + if path is None: + path = os.environ["DATA_VOLUMES_CONFIG"] + + data_volumes = [] + with open(path) as fp: + for line in fp: + if data_volume := os.path.expandvars(line.rstrip("\n")): + data_volumes.append(data_volume) + return data_volumes + + def is_bucket_existing(client: Any, bucket_name: str) -> bool | None: """Return True if the bucket exists.""" try: @@ -43,13 +57,18 @@ def create_download_bucket( Parameters ---------- - bucket_name: name of the bucket + bucket_name: name of the bucket (something as 's3://mybucketname' or just 'mybucketname') object_storage_url: endpoint URL of the object storage client: client to use, default is boto3 (used for testing) storage_kws: dictionary of parameters used to pass to the storage client. """ + bucket_url_obj = urllib.parse.urlparse(bucket_name) + scheme = "s3" + if bucket_url_obj.scheme: + scheme = bucket_url_obj.scheme + bucket_name = bucket_url_obj.netloc if not client: - client = boto3.client("s3", endpoint_url=object_storage_url, **storage_kws) + client = boto3.client(scheme, endpoint_url=object_storage_url, **storage_kws) if not is_bucket_existing(client, bucket_name): logger.info(f"creation of bucket {bucket_name}") client.create_bucket(Bucket=bucket_name) diff --git a/tests/test_90_entry_points.py b/tests/test_90_entry_points.py index 2657094e..e05c9b6c 100644 --- a/tests/test_90_entry_points.py +++ b/tests/test_90_entry_points.py @@ -1,6 +1,8 @@ import datetime import json import logging +import os +import unittest.mock import uuid from typing import Any @@ -60,8 +62,11 @@ def mock_config( return adaptor_properties -def test_init_db(postgresql: Connection[str], mocker) -> None: +def test_init_db(postgresql: Connection[str], tmpdir, mocker) -> None: patch_storage = mocker.patch.object(object_storage, "create_download_bucket") + data_volumes_config_path = os.path.join(str(tmpdir), "data_volumes.config") + with open(data_volumes_config_path, "w") as fp: + fp.writelines(["s3://mybucket1\n", "s3://mybucket2\n"]) connection_string = ( f"postgresql://{postgresql.info.user}:" f"@{postgresql.info.host}:{postgresql.info.port}/{postgresql.info.dbname}" @@ -83,12 +88,14 @@ def test_init_db(postgresql: Connection[str], mocker) -> None: "OBJECT_STORAGE_URL": object_storage_url, "STORAGE_ADMIN": object_storage_kws["aws_access_key_id"], "STORAGE_PASSWORD": object_storage_kws["aws_secret_access_key"], + "DATA_VOLUMES_CONFIG": data_volumes_config_path, }, ) assert result.exit_code == 0 - patch_storage.assert_called_once_with( - "cache", object_storage_url, **object_storage_kws - ) + assert patch_storage.mock_calls == [ + unittest.mock.call("s3://mybucket1", object_storage_url, **object_storage_kws), + unittest.mock.call("s3://mybucket2", object_storage_url, **object_storage_kws), + ] assert set(conn.execute(query).scalars()) == set( database.BaseModel.metadata.tables ).union(