Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select targeted software frontend in a clever way #893

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 50 additions & 32 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def __init__(
applied to plugins
:param register: plugin class or list of plugin classes to register
:param register_all: module for which all plugin classes defined in it
will be registered.
will be registered.
:param processors: A mapping of processor names to classes to use for
data processing.
Any additional kwargs are considered Context-specific options; see
Expand Down Expand Up @@ -2253,15 +2253,32 @@ def _check_copy_to_frontend_kwargs(
"""Simple kwargs checks for copy_to_frontend."""
if not self.is_stored(run_id, target):
raise strax.DataNotAvailable(f"Cannot copy {run_id} {target} since it does not exist")
if len(strax.to_str_tuple(target)) > 1:
raise ValueError("copy_to_frontend only works for a single target at the time")
if target_frontend_id is not None and target_frontend_id >= len(self.storage):
raise ValueError(
f"Cannot select {target_frontend_id}-th frontend as "
f"we only have {len(self.storage)} frontends!"
)
if rechunk and rechunk_to_mb == strax.DEFAULT_CHUNK_SIZE_MB:
self.log.warning("No <rechunk_to_mb> specified!")
# Reuse some codes
self._check_merge_per_chunk_storage_kwargs(run_id, target, target_frontend_id)

def _get_target_sf(self, run_id, target, target_frontend_id):
"""Get the target storage frontends for copy_to_frontend and merge_per_chunk_storage."""
if target_frontend_id is None:
target_sf = self.storage
elif len(self.storage) > target_frontend_id:
target_sf = [self.storage[target_frontend_id]]

# Keep frontends that:
# 1. don't already have the data; and
# 2. take the data; and
# 3. are not readonly
target_sf = [
t_sf
for t_sf in target_sf
if (
not self._is_stored_in_sf(run_id, target, t_sf)
and t_sf._we_take(target)
and t_sf.readonly is False
)
]
return target_sf

def copy_to_frontend(
self,
Expand All @@ -2288,27 +2305,12 @@ def copy_to_frontend(
self._check_copy_to_frontend_kwargs(
run_id, target, target_frontend_id, rechunk, rechunk_to_mb
)
if target_frontend_id is None:
target_sf = self.storage
elif len(self.storage) > target_frontend_id:
target_sf = [self.storage[target_frontend_id]]

# Figure out which of the frontends has the data. Raise error when none
source_sf = self.get_source_sf(run_id, target, should_exist=True)[0]

# Keep frontends that:
# 1. don't already have the data; and
# 2. take the data; and
# 3. are not readonly
target_sf = [
t_sf
for t_sf in target_sf
if (
not self._is_stored_in_sf(run_id, target, t_sf)
and t_sf._we_take(target)
and t_sf.readonly is False
)
]
# Get the target storage frontends
target_sf = self._get_target_sf(run_id, target, target_frontend_id)
self.log.info(f"Copy data from {source_sf} to {target_sf}")

if not len(target_sf):
Expand Down Expand Up @@ -2365,19 +2367,31 @@ def wrapped_loader():
"do you have two storage frontends writing to the same place?"
)

def _check_merge_per_chunk_storage_kwargs(self, run_id, target, target_frontend_id) -> None:
if len(strax.to_str_tuple(target)) > 1:
raise ValueError("copy_to_frontend only works for a single target at the time")
if target_frontend_id is not None and target_frontend_id >= len(self.storage):
raise ValueError(
f"Cannot select {target_frontend_id}-th frontend as "
f"we only have {len(self.storage)} frontends!"
)

def merge_per_chunk_storage(
self,
run_id: str,
target: str,
per_chunked_dependency: str,
rechunk=True,
chunk_number_group: ty.Optional[ty.List[ty.List[int]]] = None,
target_frontend_id: ty.Optional[int] = None,
):
"""Merge the per-chunked data from the per-chunked dependency into the target storage."""

if self.is_stored(run_id, target):
raise ValueError(f"Data {target} for {run_id} already exists.")

self._check_merge_per_chunk_storage_kwargs(run_id, target, target_frontend_id)

if chunk_number_group is not None:
combined_chunk_numbers = list(itertools.chain(*chunk_number_group))
if len(combined_chunk_numbers) != len(set(combined_chunk_numbers)):
Expand All @@ -2397,17 +2411,20 @@ def merge_per_chunk_storage(

# Usually we want to save in the same storage frontend
# Here we assume that the target is stored chunk by chunk of the dependency
target_sf = source_sf = self.get_source_sf(
source_sf = self.get_source_sf(
run_id,
target,
chunk_number={per_chunked_dependency: chunk_number},
should_exist=True,
)[0]

# Get the target storage frontends
target_sf = self._get_target_sf(run_id, target, target_frontend_id)

def wrapped_loader():
"""Wrapped loader for changing the target_size_mb."""
for chunk_number in chunk_number_group:
# Mostly copied from self.copy_to_frontend
# Mostly revised from self.copy_to_frontend
# Get the info from the source backend (s_be) that we need to fill
# the target backend (t_be) with
data_key = self.key_for(
Expand All @@ -2429,17 +2446,18 @@ def wrapped_loader():
except StopIteration:
continue

# Fill the target buffer
data_key = self.key_for(run_id, target, chunk_number=_chunk_number)
t_be_str, t_be_key = target_sf.find(data_key, write=True)
target_be = target_sf._get_backend(t_be_str)
target_plugin = self.__get_plugin(run_id, target, chunk_number=_chunk_number)
target_md = target_plugin.metadata(run_id, target)
# Copied from StorageBackend.saver
if "dtype" in target_md:
target_md["dtype"] = target_md["dtype"].descr.__repr__()
saver = target_be._saver(t_be_key, target_md)
saver.save_from(wrapped_loader(), rechunk=rechunk)
for t_sf in target_sf:
# Fill the target buffer
t_be_str, t_be_key = t_sf.find(data_key, write=True)
target_be = t_sf._get_backend(t_be_str)
saver = target_be._saver(t_be_key, target_md)
saver.save_from(wrapped_loader(), rechunk=rechunk)

def get_source(
self,
Expand Down
3 changes: 2 additions & 1 deletion strax/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ast import literal_eval

import strax
from strax import RUN_METADATA_PATTERN

export, __all__ = strax.exporter()

Expand Down Expand Up @@ -102,7 +103,7 @@ def _compress_blosc(data):
@export
def dry_load_files(dirname, chunk_number=None):
prefix = strax.storage.files.dirname_to_prefix(dirname)
metadata_json = f"{prefix}-metadata.json"
metadata_json = RUN_METADATA_PATTERN % prefix
md_path = os.path.join(dirname, metadata_json)

with open(md_path, mode="r") as f:
Expand Down
5 changes: 3 additions & 2 deletions strax/storage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .common import StorageFrontend

export, __all__ = strax.exporter()
__all__.extend(["RUN_METADATA_PATTERN"])

RUN_METADATA_PATTERN = "%s-metadata.json"

Expand Down Expand Up @@ -230,7 +231,7 @@ def __init__(

def _get_metadata(self, dirname):
prefix = dirname_to_prefix(dirname)
metadata_json = f"{prefix}-metadata.json"
metadata_json = RUN_METADATA_PATTERN % prefix
md_path = osp.join(dirname, metadata_json)

if not osp.exists(md_path):
Expand Down Expand Up @@ -300,7 +301,7 @@ def __init__(self, dirname, metadata, **kwargs):
self.dirname = dirname
self.tempdirname = dirname + "_temp"
self.prefix = dirname_to_prefix(dirname)
self.metadata_json = f"{self.prefix}-metadata.json"
self.metadata_json = RUN_METADATA_PATTERN % self.prefix

if os.path.exists(dirname):
print(f"Removing data in {dirname} to overwrite")
Expand Down
4 changes: 2 additions & 2 deletions strax/storage/zipfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _find(self, key, write, allow_incomplete, fuzzy_for, fuzzy_for_options):
try:
dirname = str(key)
prefix = strax.dirname_to_prefix(dirname)
zp.getinfo(f"{dirname}/{prefix}-metadata.json")
zp.getinfo(f"{dirname}/{RUN_METADATA_PATTERN % prefix}")
return bk
except KeyError:
pass
Expand Down Expand Up @@ -111,7 +111,7 @@ def _get_metadata(self, zipn_and_dirn):
zipn, dirn = zipn_and_dirn
with zipfile.ZipFile(zipn) as zp:
prefix = strax.dirname_to_prefix(dirn)
with zp.open(f"{dirn}/{prefix}-metadata.json") as f:
with zp.open(f"{dirn}/{RUN_METADATA_PATTERN % prefix}") as f:
return json.loads(f.read())

def saver(self, *args, **kwargs):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ def test_copy_to_frontend():
# Make sure both frontends have the same data.
assert os.listdir(temp_dir) == os.listdir(temp_dir)
rec_folder = os.listdir(temp_dir)[0]
assert os.listdir(os.path.join(temp_dir, rec_folder)) == os.listdir(
os.path.join(temp_dir_2, rec_folder)
)
list_temp_dir = sorted(os.listdir(os.path.join(temp_dir, rec_folder)))
list_temp_dir_2 = sorted(os.listdir(os.path.join(temp_dir_2, rec_folder)))
assert list_temp_dir == list_temp_dir_2

# Clear the temp dir
shutil.rmtree(temp_dir_2)
Expand Down
8 changes: 6 additions & 2 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os.path as osp
import pytest

from strax import RUN_METADATA_PATTERN
from strax.testutils import *

processing_conditions = pytest.mark.parametrize(
Expand Down Expand Up @@ -111,7 +112,10 @@ def test_filestore(allow_multiprocess, max_workers, processor):
# The first dir contains peaks.
# It should have one data chunk (rechunk is on) and a metadata file
prefix = strax.dirname_to_prefix(data_dirs[0])
assert sorted(os.listdir(data_dirs[0])) == [f"{prefix}-000000", f"{prefix}-metadata.json"]
assert sorted(os.listdir(data_dirs[0])) == [
f"{prefix}-000000",
RUN_METADATA_PATTERN % prefix,
]

# Check metadata got written correctly.
metadata = mystrax.get_metadata(run_id, "peaks")
Expand All @@ -121,7 +125,7 @@ def test_filestore(allow_multiprocess, max_workers, processor):
assert len(metadata["chunks"]) == 1

# Check data gets loaded from cache, not rebuilt
md_filename = osp.join(data_dirs[0], f"{prefix}-metadata.json")
md_filename = osp.join(data_dirs[0], RUN_METADATA_PATTERN % prefix)
mtime_before = osp.getmtime(md_filename)
peaks_2 = mystrax.get_array(run_id=run_id, targets="peaks")
np.testing.assert_array_equal(peaks_1, peaks_2)
Expand Down
4 changes: 3 additions & 1 deletion tests/test_saving.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import tempfile

from strax import RUN_METADATA_PATTERN


class TestPerRunDefaults(unittest.TestCase):
"""Test the saving behavior of the context."""
Expand Down Expand Up @@ -72,7 +74,7 @@ def test_raise_corruption(self):

# copied from FileSytemBackend (maybe abstractify the method separately?)
prefix = strax.dirname_to_prefix(data_path)
metadata_json = f"{prefix}-metadata.json"
metadata_json = RUN_METADATA_PATTERN % prefix
md_path = os.path.join(data_path, metadata_json)
assert os.path.exists(md_path)

Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def test_check_chunk_n(self):
prefix = strax.storage.files.dirname_to_prefix(backend_key)
md = st_new.get_metadata(self.run_id, self.target)
md["chunks"][0]["n"] += 1
md_path = os.path.join(backend_key, f"{prefix}-metadata.json")
md_path = os.path.join(backend_key, strax.RUN_METADATA_PATTERN % prefix)
with open(md_path, "w") as file:
json.dump(md, file, indent=4)

Expand Down