Skip to content

Commit

Permalink
create multiple buckets at init (#145)
Browse files Browse the repository at this point in the history
* create multiple buckets at init

* removed dependency of cads_worker from entry_points

* kill job on workers

* killed job

* qa

* Enhance job termination process on workers to handle multiple processes and improve error logging

* Refactor logging in job termination process for improved readability

* Refactor logging in job termination process for improved readability

* improve log message

---------

Co-authored-by: Francesco Nazzaro <[email protected]>
  • Loading branch information
alex75 and francesconazzaro authored Jan 30, 2025
1 parent c8ac301 commit 94b98f6
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
3 changes: 2 additions & 1 deletion cads_broker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def kill_job_on_worker(client: distributed.Client, request_uid: str) -> None:
)
except (KeyError, NameError):
logger.warning(
"worker not found while killing a job", job_id=request_uid, pid=pid, worker_ip=worker_ip
"worker not found", job_id=request_uid, pid=pid, worker_ip=worker_ip
)


Expand Down Expand Up @@ -450,6 +450,7 @@ def sync_database(self, session: sa.orm.Session) -> None:
# try to cancel the job directly on the scheduler
cancel_jobs_on_scheduler(self.client, job_ids=[request.request_uid])
kill_job_on_worker(self.client, request.request_uid)
kill_job_on_worker(self.client, request.request_uid)
session = self.manage_dismissed_request(request, session)
session.commit()

Expand Down
12 changes: 7 additions & 5 deletions cads_broker/entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
import uuid
from pathlib import Path
from typing import Any, Optional
from typing import Any, List, Optional

import prettytable
import sqlalchemy as sa
Expand Down Expand Up @@ -316,10 +316,12 @@ def init_db(connection_string: Optional[str] = None, force: bool = False) -> Non
"aws_access_key_id": os.environ["STORAGE_ADMIN"],
"aws_secret_access_key": os.environ["STORAGE_PASSWORD"],
}
object_storage.create_download_bucket(
os.environ.get("CACHE_BUCKET", "cache"), object_storage_url, **storage_kws
)
print("successfully created the cache area in the object storage.")
download_buckets: List[str] = object_storage.parse_data_volumes_config()
for download_bucket in download_buckets:
object_storage.create_download_bucket(
download_bucket, object_storage_url, **storage_kws
)
print("successfully created the cache areas in the object storage.")


@app.command()
Expand Down
23 changes: 21 additions & 2 deletions cads_broker/object_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""utility module to interface to the object storage."""

import os.path
import urllib.parse
from typing import Any

import boto3 # type: ignore
Expand All @@ -9,6 +11,18 @@
logger: structlog.stdlib.BoundLogger = structlog.get_logger(__name__)


def parse_data_volumes_config(path: str | None = None) -> list[str]:
if path is None:
path = os.environ["DATA_VOLUMES_CONFIG"]

data_volumes = []
with open(path) as fp:
for line in fp:
if data_volume := os.path.expandvars(line.rstrip("\n")):
data_volumes.append(data_volume)
return data_volumes


def is_bucket_existing(client: Any, bucket_name: str) -> bool | None:
"""Return True if the bucket exists."""
try:
Expand Down Expand Up @@ -43,13 +57,18 @@ def create_download_bucket(
Parameters
----------
bucket_name: name of the bucket
bucket_name: name of the bucket (something as 's3://mybucketname' or just 'mybucketname')
object_storage_url: endpoint URL of the object storage
client: client to use, default is boto3 (used for testing)
storage_kws: dictionary of parameters used to pass to the storage client.
"""
bucket_url_obj = urllib.parse.urlparse(bucket_name)
scheme = "s3"
if bucket_url_obj.scheme:
scheme = bucket_url_obj.scheme
bucket_name = bucket_url_obj.netloc
if not client:
client = boto3.client("s3", endpoint_url=object_storage_url, **storage_kws)
client = boto3.client(scheme, endpoint_url=object_storage_url, **storage_kws)
if not is_bucket_existing(client, bucket_name):
logger.info(f"creation of bucket {bucket_name}")
client.create_bucket(Bucket=bucket_name)
Expand Down
15 changes: 11 additions & 4 deletions tests/test_90_entry_points.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import json
import logging
import os
import unittest.mock
import uuid
from typing import Any

Expand Down Expand Up @@ -60,8 +62,11 @@ def mock_config(
return adaptor_properties


def test_init_db(postgresql: Connection[str], mocker) -> None:
def test_init_db(postgresql: Connection[str], tmpdir, mocker) -> None:
patch_storage = mocker.patch.object(object_storage, "create_download_bucket")
data_volumes_config_path = os.path.join(str(tmpdir), "data_volumes.config")
with open(data_volumes_config_path, "w") as fp:
fp.writelines(["s3://mybucket1\n", "s3://mybucket2\n"])
connection_string = (
f"postgresql://{postgresql.info.user}:"
f"@{postgresql.info.host}:{postgresql.info.port}/{postgresql.info.dbname}"
Expand All @@ -83,12 +88,14 @@ def test_init_db(postgresql: Connection[str], mocker) -> None:
"OBJECT_STORAGE_URL": object_storage_url,
"STORAGE_ADMIN": object_storage_kws["aws_access_key_id"],
"STORAGE_PASSWORD": object_storage_kws["aws_secret_access_key"],
"DATA_VOLUMES_CONFIG": data_volumes_config_path,
},
)
assert result.exit_code == 0
patch_storage.assert_called_once_with(
"cache", object_storage_url, **object_storage_kws
)
assert patch_storage.mock_calls == [
unittest.mock.call("s3://mybucket1", object_storage_url, **object_storage_kws),
unittest.mock.call("s3://mybucket2", object_storage_url, **object_storage_kws),
]
assert set(conn.execute(query).scalars()) == set(
database.BaseModel.metadata.tables
).union(
Expand Down

0 comments on commit 94b98f6

Please sign in to comment.