Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CEPH #47

Merged
merged 11 commits into from
Sep 13, 2024
Merged

CEPH #47

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions cads_worker/entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import typer
from typer import Option

from . import config
from . import config, utils

config.configure_logger()
LOGGER = structlog.get_logger(__name__)
Expand All @@ -28,24 +28,31 @@ class CleanerKwargs(TypedDict):
delete_unknown_files: bool
lock_validity_period: float
use_database: bool
depth: int


def _cache_cleaner() -> None:
cache_bucket = os.environ.get("CACHE_BUCKET", None)
use_database = strtobool(os.environ.get("USE_DATABASE", "1"))
cleaner_kwargs = CleanerKwargs(
maxsize=int(os.environ.get("MAX_SIZE", 1_000_000_000)),
method=os.environ.get("METHOD", "LRU"),
delete_unknown_files=not use_database,
lock_validity_period=float(os.environ.get("LOCK_VALIDITY_PERIOD", 86400)),
use_database=use_database,
depth=int(os.getenv("CACHE_DEPTH", 2)),
)
LOGGER.info("Running cache cleaner", cache_bucket=cache_bucket, **cleaner_kwargs)
try:
cacholote.clean_cache_files(**cleaner_kwargs)
except Exception:
LOGGER.exception("cache_cleaner crashed")
raise
for cache_files_urlpath in utils.parse_data_volumes_config():
cacholote.config.set(cache_files_urlpath=cache_files_urlpath)
LOGGER.info(
"Running cache cleaner",
cache_files_urlpath=cache_files_urlpath,
**cleaner_kwargs,
)
try:
cacholote.clean_cache_files(**cleaner_kwargs)
except Exception:
LOGGER.exception("cache_cleaner crashed")
raise


def _add_tzinfo(timestamp: datetime.datetime) -> datetime.datetime:
Expand Down
35 changes: 35 additions & 0 deletions cads_worker/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import contextlib
import os
import pathlib
import tempfile
from collections.abc import Iterator


def parse_data_volumes_config(path: str | None = None) -> list[str]:
if path is None:
path = os.environ["DATA_VOLUMES_CONFIG"]

with open(path) as fp:
return [os.path.expandvars(line.rstrip("\n")) for line in fp]


@contextlib.contextmanager
def enter_tmp_working_dir() -> Iterator[str]:
old_cwd = os.getcwd()
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
try:
yield os.getcwd()
finally:
os.chdir(old_cwd)


@contextlib.contextmanager
def make_cache_tmp_path(base_dir: str) -> Iterator[pathlib.Path]:
with tempfile.TemporaryDirectory(dir=base_dir) as tmpdir:
cache_tmp_path = pathlib.Path(tmpdir)
cache_tmp_path.with_suffix(".lock").touch()
try:
yield cache_tmp_path
finally:
cache_tmp_path.with_suffix(".lock").unlink(missing_ok=True)
68 changes: 42 additions & 26 deletions cads_worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import datetime
import functools
import os
import random
import socket
import tempfile
from typing import Any

import cacholote
Expand All @@ -11,7 +12,7 @@
import structlog
from distributed import get_worker

from . import config
from . import config, utils

config.configure_logger()

Expand Down Expand Up @@ -192,37 +193,52 @@ def submit_workflow(
config.update(system_request.adaptor_properties.config)

structlog.contextvars.bind_contextvars(event_type="DATASET_COMPUTE", job_id=job_id)

cache_files_urlpath = random.choice(utils.parse_data_volumes_config())
depth = int(os.getenv("CACHE_DEPTH", 1))
if depth == 2:
cache_files_urlpath = os.path.join(
cache_files_urlpath, datetime.date.today().isoformat()
)
elif depth != 1:
context.warn(f"CACHE_DETPH={depth} is not supported.")

logger.info("Processing job", job_id=job_id)
collection_id = config.get("collection_id")
cacholote.config.set(
logger=LOGGER,
cache_db_urlpath=None,
create_engine_kwargs={},
cache_files_urlpath=cache_files_urlpath,
sessionmaker=context.session_maker,
context=context,
tag=collection_id,
)
fs, dirname = cacholote.utils.get_cache_files_fs_dirname()

adaptor_class = cads_adaptors.get_adaptor_class(entry_point, setup_code)
adaptor = adaptor_class(form=form, context=context, **config)
collection_id = config.get("collection_id")
cwd = os.getcwd()
with tempfile.TemporaryDirectory() as tmpdir:
os.chdir(tmpdir)
try:
request = {k: request[k] for k in sorted(request.keys())}
with cacholote.config.set(tag=collection_id):
result = cacholote.cacheable(
adaptor.retrieve, collection_id=collection_id
)(request=request)
except Exception as err:
logger.exception(job_id=job_id, event_type="EXCEPTION")
context.add_user_visible_error(
f"The job failed with: {err.__class__.__name__}"
)
context.error(f"{err.__class__.__name__}: {str(err)}")
raise
finally:
os.chdir(cwd)
fs, _ = cacholote.utils.get_cache_files_fs_dirname()
fs.chmod(result.result["args"][0]["file:local_path"], acl="public-read")
try:
with utils.enter_tmp_working_dir() as working_dir:
base_dir = dirname if "file" in fs.protocol else working_dir
with utils.make_cache_tmp_path(base_dir) as cache_tmp_path:
adaptor = adaptor_class(
form=form,
context=context,
cache_tmp_path=cache_tmp_path,
**config,
)
request = {k: request[k] for k in sorted(request.keys())}
cached_retrieve = cacholote.cacheable(
adaptor.retrieve,
collection_id=collection_id,
)
result = cached_retrieve(request=request)
except Exception as err:
logger.exception(job_id=job_id, event_type="EXCEPTION")
context.add_user_visible_error(f"The job failed with: {err.__class__.__name__}")
context.error(f"{err.__class__.__name__}: {str(err)}")
raise

if "s3" in fs.protocol:
fs.chmod(result.result["args"][0]["file:local_path"], acl="public-read")
with context.session_maker() as session:
request = cads_broker.database.set_request_cache_id(
request_uid=job_id,
Expand Down
5 changes: 5 additions & 0 deletions tests/test_10_cache_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def test_cache_cleaner(
cached_path = pathlib.Path(cached_open(dummy_path).name)
assert cached_path.exists()

# create data nodes config
data_volumes_config = tmp_path / "data-volumes.config"
data_volumes_config.write_text(cache_files_urlpath)
monkeypatch.setenv("DATA_VOLUMES_CONFIG", str(data_volumes_config))

# clean cache
monkeypatch.setenv("MAX_SIZE", "0")
monkeypatch.setenv("USE_DATABASE", use_database)
Expand Down
38 changes: 38 additions & 0 deletions tests/test_30_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import pathlib
import tempfile

import pytest

from cads_worker import utils


def test_utils_parse_data_volumes_config(
tmp_path: pathlib.Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("FOO", "foo")
monkeypatch.setenv("BAR", "bar")
data_volumes_config = tmp_path / "data-volumes.config"
data_volumes_config.write_text("$FOO\n${BAR}")
assert utils.parse_data_volumes_config(str(data_volumes_config)) == ["foo", "bar"]

monkeypatch.setenv("DATA_VOLUMES_CONFIG", str(data_volumes_config))
assert utils.parse_data_volumes_config(None) == ["foo", "bar"]


def test_utils_enter_tmp_working_dir() -> None:
with utils.enter_tmp_working_dir() as tmp_working_dir:
assert os.getcwd() == tmp_working_dir
assert os.path.dirname(tmp_working_dir) == os.path.realpath(
tempfile.gettempdir()
)
assert not os.path.exists(tmp_working_dir)


def test_utils_make_cache_tmp_path(tmp_path: pathlib.Path) -> None:
with utils.make_cache_tmp_path(str(tmp_path)) as cache_tmp_path:
assert cache_tmp_path.parent == tmp_path
assert cache_tmp_path.with_suffix(".lock").exists()
assert not cache_tmp_path.exists()
assert not cache_tmp_path.with_suffix(".lock").exists()
Loading