Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify functionality of super and hyperrun #871

Merged
merged 16 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 102 additions & 65 deletions strax/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ class Chunk:
# run_id is not superfluous to track:
# this could change during the run in superruns (in the future)
run_id: str
subruns: dict
superrun: dict
start: int
end: int

Expand Down Expand Up @@ -93,14 +91,7 @@ def __init__(
f"Attempt to create chunk {self} whose data ends late at {data_ends_at}"
)

if superrun is None:
self.superrun = {run_id: {"start": start, "end": end}}
else:
if not isinstance(superrun, dict):
raise ValueError(f"Attempt to create chunk {self} with non-dict superrun")
if superrun == {}:
raise ValueError(f"Attempt to create chunk {self} with empty superrun")
self.superrun = superrun
self.superrun = superrun

def __len__(self):
return len(self.data)
Expand Down Expand Up @@ -128,6 +119,22 @@ def duration(self):
def is_superrun(self):
return bool(self.subruns) and self.run_id.startswith("_")

@property
def subruns(self):
return self._subruns

@subruns.setter
def subruns(self, subruns):
if isinstance(subruns, dict) and None in subruns:
raise ValueError(
f"Attempt to create chunk {self} with None as run_id in subrun {subruns}"
)
if subruns is None:
self._subruns = None
else:
self._subruns = dict(sorted(subruns.items(), key=lambda x: x[1]["start"]))
_sorted_subruns_check(self._subruns)

@property
def first_subrun(self):
_subrun = None
Expand All @@ -144,14 +151,42 @@ def last_subrun(self):

def _get_subrun(self, index):
"""Returns subrun according to position in chunk."""
subrun_id = list(self.subruns.keys())[index]
run_id = list(self.subruns.keys())[index]
_subrun = {
"run_id": subrun_id,
"start": self.subruns[subrun_id]["start"],
"end": self.subruns[subrun_id]["end"],
"run_id": run_id,
"start": self.subruns[run_id]["start"],
"end": self.subruns[run_id]["end"],
}
return _subrun

@property
def superrun(self):
return self._superrun

@superrun.setter
def superrun(self, superrun):
"""Superrun can only be None or dict with non-None keys."""
if not isinstance(superrun, dict) and superrun is not None:
raise ValueError(
"When creating chunk, superrun can only be dict or None. "
f"But got {superrun} for {self}."
)
if superrun is None:
superrun = {self.run_id: {"start": self.start, "end": self.end}}
if len(superrun) == 0:
raise ValueError(f"Attempt to create chunk {self} with empty superrun")
if None in superrun:
raise ValueError(
f"Attempt to create chunk {self} with None as run_id in superrun {superrun}"
)
# The only chance self.run_id to be None is that self is concatenated from different runs
if len(superrun) == 1 and self.run_id is None:
raise ValueError(
f"If superrun {superrun} of {self} has only one run_id, run_id should be provided."
)
self._superrun = dict(sorted(superrun.items(), key=lambda x: x[1]["start"]))
_sorted_subruns_check(self._superrun)

def _mbs(self):
if self.duration:
return (self.nbytes / 1e6) / (self.duration / 1e9)
Expand All @@ -178,7 +213,6 @@ def split(self, t: ty.Union[int, None], allow_early_split=False):
data1, data2, t = split_array(data=self.data, t=t, allow_early_split=allow_early_split)

common_kwargs = dict(
run_id=self.run_id,
dtype=self.dtype,
data_type=self.data_type,
data_kind=self.data_kind,
Expand All @@ -187,22 +221,32 @@ def split(self, t: ty.Union[int, None], allow_early_split=False):

subruns_first_chunk, subruns_second_chunk = _split_runs_in_chunk(self.subruns, t)
superrun_first_chunk, superrun_second_chunk = _split_runs_in_chunk(self.superrun, t)
# If the superrun is split and the fragment cover only one run,
# you need to recover the run_id
if superrun_first_chunk is None or len(superrun_first_chunk) == 1:
run_id_first_chunk = list(self.superrun.keys())[0]
else:
run_id_first_chunk = self.run_id
if superrun_second_chunk is None or len(superrun_second_chunk) == 1:
run_id_second_chunk = list(self.superrun.keys())[-1]
else:
run_id_second_chunk = self.run_id

c1 = strax.Chunk(
start=self.start,
end=max(self.start, t), # type: ignore
data=data1,
subruns=subruns_first_chunk,
superrun=superrun_first_chunk,
**common_kwargs,
**{**common_kwargs, "run_id": run_id_first_chunk},
)
c2 = strax.Chunk(
start=max(self.start, t), # type: ignore
end=max(t, self.end), # type: ignore
data=data2,
subruns=subruns_second_chunk,
superrun=superrun_second_chunk,
**common_kwargs,
**{**common_kwargs, "run_id": run_id_second_chunk},
)
return c1, c2

Expand Down Expand Up @@ -258,7 +302,7 @@ def merge(cls, chunks, data_type="<UNKNOWN>"):
)

@classmethod
def concatenate(cls, chunks, allow_hyperrun=False):
def concatenate(cls, chunks, allow_superrun=False):
"""Create chunk by concatenating chunks of same data type You can pass None's, they will be
ignored."""
chunks = [c for c in chunks if c is not None]
Expand All @@ -274,7 +318,7 @@ def concatenate(cls, chunks, allow_hyperrun=False):

run_ids = [c.run_id for c in chunks]

if len(set(run_ids)) != 1 and not allow_hyperrun:
if len(set(run_ids)) != 1 and not allow_superrun:
raise ValueError(
f"Cannot concatenate {data_type} chunks with different run ids: {run_ids}"
)
Expand Down Expand Up @@ -395,44 +439,26 @@ def split_array(data, t, allow_early_split=False):
return data[:splittable_i], data[splittable_i:], t


@export
def transform_chunk_to_superrun_chunk(superrun_id, chunk):
"""Function which transforms/creates a new superrun chunk from subrun chunk.
def _sorted_subruns_check(subruns):
"""Check if subruns are not overlapping."""
if subruns is None:
return
runs_start_end = list(subruns.values())
for i in range(len(runs_start_end) - 1):
if runs_start_end[i]["end"] > runs_start_end[i + 1]["start"]:
raise ValueError(f"Subruns are overlapping: {subruns}.")

:param superrun_id: id/name of the superrun.
:param chunk: strax.Chunk of a superrun subrun.
:return: strax.Chunk

"""
if chunk is None:
return chunk
if chunk.subruns is not None:
raise ValueError("Chunk {chunk} is already a superrun chunk.")
subruns = {chunk.run_id: {"start": chunk.start, "end": chunk.end}}

return Chunk(
start=chunk.start,
end=chunk.end,
dtype=chunk.dtype,
data_type=chunk.data_type,
data_kind=chunk.data_kind,
run_id=superrun_id,
subruns=subruns,
data=chunk.data,
target_size_mb=chunk.target_size_mb,
)


def _merge_runs_in_chunk(runs_of_chunk, merged_runs):
def _merge_runs_in_chunk(subruns, merged_runs):
"""Merge subruns information during concatenation or merge."""
if runs_of_chunk is None:
if subruns is None:
return
for run_id, run_start_end in runs_of_chunk.items():
for run_id, run_start_end in subruns.items():
merged_runs.setdefault(run_id, [])
merged_runs[run_id].append([run_start_end["start"], run_start_end["end"]])


def _continuity_check(merged_runs, merge=False):
def _mergable_check(merged_runs, merge=False):
"""Check continuity of runs in a superrun chunk."""
for run_id in merged_runs.keys():
merged_runs[run_id].sort(key=lambda x: x[0])
Expand Down Expand Up @@ -468,7 +494,7 @@ def _merge_subruns_in_chunk(chunks):
subruns = dict()
for c_i, c in enumerate(chunks):
_merge_runs_in_chunk(c.subruns, subruns)
_continuity_check(subruns)
_mergable_check(subruns)
if subruns:
return subruns
else:
Expand All @@ -480,28 +506,41 @@ def _merge_superrun_in_chunk(chunks, merge=False):
superrun = dict()
for c_i, c in enumerate(chunks):
_merge_runs_in_chunk(c.superrun, superrun)
_continuity_check(superrun, merge)
_mergable_check(superrun, merge)
return superrun


def _split_runs_in_chunk(runs_of_chunk, t):
def _pop_out_empty_run_id(subruns):
"""Remove empty run_id from chunks in superrun."""
keys_to_remove = []
for key in subruns.keys():
if subruns[key]["start"] == subruns[key]["end"]:
keys_to_remove.append(key)
for key in keys_to_remove:
subruns.pop(key)


def _split_runs_in_chunk(subruns, t):
"""Split list of runs in a superrun chunk during split.

Updates also their start/ends too.

"""
if runs_of_chunk is None:
if subruns is None:
return None, None
runs_first_chunk = {}
runs_second_chunk = {}
for subrun_id, subrun_start_end in runs_of_chunk.items():
if t < subrun_start_end["start"]:
runs_second_chunk[subrun_id] = subrun_start_end
elif subrun_start_end["start"] <= t < subrun_start_end["end"]:
runs_first_chunk[subrun_id] = {"start": subrun_start_end["start"], "end": int(t)}
runs_second_chunk[subrun_id] = {"start": int(t), "end": subrun_start_end["end"]}
elif subrun_start_end["end"] <= t:
runs_first_chunk[subrun_id] = subrun_start_end
for run_id, run_start_end in subruns.items():
if t <= run_start_end["start"]:
runs_second_chunk[run_id] = run_start_end
elif run_start_end["start"] < t < run_start_end["end"]:
runs_first_chunk[run_id] = {"start": run_start_end["start"], "end": int(t)}
runs_second_chunk[run_id] = {"start": int(t), "end": run_start_end["end"]}
elif run_start_end["end"] <= t:
runs_first_chunk[run_id] = run_start_end
# Pop out empty run_id
_pop_out_empty_run_id(runs_first_chunk)
_pop_out_empty_run_id(runs_second_chunk)
# Make sure that either dictionary with content or None is assigned to Chunk
if runs_first_chunk == {}:
runs_first_chunk = None
Expand All @@ -522,15 +561,13 @@ class Rechunker:

def __init__(self, rechunk=False, run_id=None):
self.rechunk = rechunk
self.is_superrun = run_id and run_id.startswith("_") and not run_id.startswith("__")
self.is_superrun = run_id and run_id.startswith("_")
self.run_id = run_id

self.cache = None

def receive(self, chunk) -> list:
"""Receive a chunk, return list of chunks to send out after merging and splitting."""
if self.is_superrun:
chunk = strax.transform_chunk_to_superrun_chunk(self.run_id, chunk)
if not self.rechunk:
# We aren't rechunking
return [chunk]
Expand All @@ -539,7 +576,7 @@ def receive(self, chunk) -> list:
# We have an old chunk, so we need to concatenate
# We do not expect after concatenation that the chunk will be very large because
# the self.cache is already after splitting according to the target size
chunk = strax.Chunk.concatenate([self.cache, chunk])
chunk = strax.Chunk.concatenate([self.cache, chunk], allow_superrun=self.is_superrun)

target_size_b = chunk.target_size_mb * 1e6

Expand Down
5 changes: 0 additions & 5 deletions strax/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,6 @@ def __init__(
f"Please update {self.name} accordingly."
)

# if self.default_by_run is not OMITTED:
# warnings.warn(f"The {self.name} option uses default_by_run,"
# f" which will soon stop working!",
# DeprecationWarning)

if (
sum(
[
Expand Down
Loading