Skip to content

Commit

Permalink
CEPH (#47)
Browse files Browse the repository at this point in the history
* allow multiple cache files urlapaths

* change env var

* rename

* expand env vars

* use subfolders

* test now

* handle depth

* cast to int

* fix chmod only on s3

* allow cads-adaptors to directly write on the cache filesystem

---------

Co-authored-by: Francesco Nazzaro <[email protected]>
  • Loading branch information
malmans2 and francesconazzaro authored Sep 13, 2024
1 parent 5372e00 commit 929eea1
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 34 deletions.
23 changes: 15 additions & 8 deletions cads_worker/entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import typer
from typer import Option

from . import config
from . import config, utils

config.configure_logger()
LOGGER = structlog.get_logger(__name__)
Expand All @@ -28,24 +28,31 @@ class CleanerKwargs(TypedDict):
delete_unknown_files: bool
lock_validity_period: float
use_database: bool
depth: int


def _cache_cleaner() -> None:
cache_bucket = os.environ.get("CACHE_BUCKET", None)
use_database = strtobool(os.environ.get("USE_DATABASE", "1"))
cleaner_kwargs = CleanerKwargs(
maxsize=int(os.environ.get("MAX_SIZE", 1_000_000_000)),
method=os.environ.get("METHOD", "LRU"),
delete_unknown_files=not use_database,
lock_validity_period=float(os.environ.get("LOCK_VALIDITY_PERIOD", 86400)),
use_database=use_database,
depth=int(os.getenv("CACHE_DEPTH", 2)),
)
LOGGER.info("Running cache cleaner", cache_bucket=cache_bucket, **cleaner_kwargs)
try:
cacholote.clean_cache_files(**cleaner_kwargs)
except Exception:
LOGGER.exception("cache_cleaner crashed")
raise
for cache_files_urlpath in utils.parse_data_volumes_config():
cacholote.config.set(cache_files_urlpath=cache_files_urlpath)
LOGGER.info(
"Running cache cleaner",
cache_files_urlpath=cache_files_urlpath,
**cleaner_kwargs,
)
try:
cacholote.clean_cache_files(**cleaner_kwargs)
except Exception:
LOGGER.exception("cache_cleaner crashed")
raise


def _add_tzinfo(timestamp: datetime.datetime) -> datetime.datetime:
Expand Down
35 changes: 35 additions & 0 deletions cads_worker/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import contextlib
import os
import pathlib
import tempfile
from collections.abc import Iterator


def parse_data_volumes_config(path: str | None = None) -> list[str]:
if path is None:
path = os.environ["DATA_VOLUMES_CONFIG"]

with open(path) as fp:
return [os.path.expandvars(line.rstrip("\n")) for line in fp]


@contextlib.contextmanager
def enter_tmp_working_dir() -> Iterator[str]:
old_cwd = os.getcwd()
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
try:
yield os.getcwd()
finally:
os.chdir(old_cwd)


@contextlib.contextmanager
def make_cache_tmp_path(base_dir: str) -> Iterator[pathlib.Path]:
with tempfile.TemporaryDirectory(dir=base_dir) as tmpdir:
cache_tmp_path = pathlib.Path(tmpdir)
cache_tmp_path.with_suffix(".lock").touch()
try:
yield cache_tmp_path
finally:
cache_tmp_path.with_suffix(".lock").unlink(missing_ok=True)
68 changes: 42 additions & 26 deletions cads_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import datetime
import functools
import os
import random
import socket
import tempfile
from typing import Any

import cacholote
Expand All @@ -11,7 +12,7 @@
import structlog
from distributed import get_worker

from . import config
from . import config, utils

config.configure_logger()

Expand Down Expand Up @@ -192,37 +193,52 @@ def submit_workflow(
config.update(system_request.adaptor_properties.config)

structlog.contextvars.bind_contextvars(event_type="DATASET_COMPUTE", job_id=job_id)

cache_files_urlpath = random.choice(utils.parse_data_volumes_config())
depth = int(os.getenv("CACHE_DEPTH", 1))
if depth == 2:
cache_files_urlpath = os.path.join(
cache_files_urlpath, datetime.date.today().isoformat()
)
elif depth != 1:
context.warn(f"CACHE_DETPH={depth} is not supported.")

logger.info("Processing job", job_id=job_id)
collection_id = config.get("collection_id")
cacholote.config.set(
logger=LOGGER,
cache_db_urlpath=None,
create_engine_kwargs={},
cache_files_urlpath=cache_files_urlpath,
sessionmaker=context.session_maker,
context=context,
tag=collection_id,
)
fs, dirname = cacholote.utils.get_cache_files_fs_dirname()

adaptor_class = cads_adaptors.get_adaptor_class(entry_point, setup_code)
adaptor = adaptor_class(form=form, context=context, **config)
collection_id = config.get("collection_id")
cwd = os.getcwd()
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
try:
request = {k: request[k] for k in sorted(request.keys())}
with cacholote.config.set(tag=collection_id):
result = cacholote.cacheable(
adaptor.retrieve, collection_id=collection_id
)(request=request)
except Exception as err:
logger.exception(job_id=job_id, event_type="EXCEPTION")
context.add_user_visible_error(
f"The job failed with: {err.__class__.__name__}"
)
context.error(f"{err.__class__.__name__}: {str(err)}")
raise
finally:
os.chdir(cwd)
fs, _ = cacholote.utils.get_cache_files_fs_dirname()
fs.chmod(result.result["args"][0]["file:local_path"], acl="public-read")
try:
with utils.enter_tmp_working_dir() as working_dir:
base_dir = dirname if "file" in fs.protocol else working_dir
with utils.make_cache_tmp_path(base_dir) as cache_tmp_path:
adaptor = adaptor_class(
form=form,
context=context,
cache_tmp_path=cache_tmp_path,
**config,
)
request = {k: request[k] for k in sorted(request.keys())}
cached_retrieve = cacholote.cacheable(
adaptor.retrieve,
collection_id=collection_id,
)
result = cached_retrieve(request=request)
except Exception as err:
logger.exception(job_id=job_id, event_type="EXCEPTION")
context.add_user_visible_error(f"The job failed with: {err.__class__.__name__}")
context.error(f"{err.__class__.__name__}: {str(err)}")
raise

if "s3" in fs.protocol:
fs.chmod(result.result["args"][0]["file:local_path"], acl="public-read")
with context.session_maker() as session:
request = cads_broker.database.set_request_cache_id(
request_uid=job_id,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_10_cache_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def test_cache_cleaner(
cached_path = pathlib.Path(cached_open(dummy_path).name)
assert cached_path.exists()

# create data nodes config
data_volumes_config = tmp_path / "data-volumes.config"
data_volumes_config.write_text(cache_files_urlpath)
monkeypatch.setenv("DATA_VOLUMES_CONFIG", str(data_volumes_config))

# clean cache
monkeypatch.setenv("MAX_SIZE", "0")
monkeypatch.setenv("USE_DATABASE", use_database)
Expand Down
38 changes: 38 additions & 0 deletions tests/test_30_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import pathlib
import tempfile

import pytest

from cads_worker import utils


def test_utils_parse_data_volumes_config(
tmp_path: pathlib.Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("FOO", "foo")
monkeypatch.setenv("BAR", "bar")
data_volumes_config = tmp_path / "data-volumes.config"
data_volumes_config.write_text("$FOO\n${BAR}")
assert utils.parse_data_volumes_config(str(data_volumes_config)) == ["foo", "bar"]

monkeypatch.setenv("DATA_VOLUMES_CONFIG", str(data_volumes_config))
assert utils.parse_data_volumes_config(None) == ["foo", "bar"]


def test_utils_enter_tmp_working_dir() -> None:
with utils.enter_tmp_working_dir() as tmp_working_dir:
assert os.getcwd() == tmp_working_dir
assert os.path.dirname(tmp_working_dir) == os.path.realpath(
tempfile.gettempdir()
)
assert not os.path.exists(tmp_working_dir)


def test_utils_make_cache_tmp_path(tmp_path: pathlib.Path) -> None:
with utils.make_cache_tmp_path(str(tmp_path)) as cache_tmp_path:
assert cache_tmp_path.parent == tmp_path
assert cache_tmp_path.with_suffix(".lock").exists()
assert not cache_tmp_path.exists()
assert not cache_tmp_path.with_suffix(".lock").exists()

0 comments on commit 929eea1

Please sign in to comment.