Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow splitting in rechunking #865

Merged
merged 14 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/rechunker
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def parse_args():
'--target_size_mb', '--target-size-mb',
dest='target_size_mb',
type=int,
default=strax.default_chunk_size_mb,
default=strax.DEFAULT_CHUNK_SIZE_MB,
help="Target size MB (uncompressed) of the rechunked data")
parser.add_argument(
'--write_stats_to', '--write-stats-to',
Expand Down
99 changes: 79 additions & 20 deletions strax/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import strax

export, __all__ = strax.exporter()
__all__.extend(["default_chunk_size_mb"])
__all__.extend(["DEFAULT_CHUNK_SIZE_MB", "DEFAULT_CHUNK_SPLIT_NS"])


default_chunk_size_mb = 200
DEFAULT_CHUNK_SIZE_MB = 200
DEFAULT_CHUNK_SPLIT_NS = 1000


@export
Expand Down Expand Up @@ -41,7 +42,7 @@ def __init__(
end,
data,
subruns=None,
target_size_mb=default_chunk_size_mb,
target_size_mb=DEFAULT_CHUNK_SIZE_MB,
):
self.data_type = data_type
self.data_kind = data_kind
Expand Down Expand Up @@ -180,16 +181,20 @@ def split(self, t: ty.Union[int, None], allow_early_split=False):
target_size_mb=self.target_size_mb,
)

subruns_first_chunk, subruns_second_chunk = _split_subruns_in_chunk(self.subruns, t)

c1 = strax.Chunk(
start=self.start,
end=max(self.start, t), # type: ignore
data=data1,
subruns=subruns_first_chunk,
**common_kwargs,
)
c2 = strax.Chunk(
start=max(self.start, t), # type: ignore
end=max(t, self.end), # type: ignore
data=data2,
subruns=subruns_second_chunk,
**common_kwargs,
)
return c1, c2
Expand Down Expand Up @@ -267,7 +272,7 @@ def concatenate(cls, chunks, allow_hyperrun=False):
)

run_id = run_ids[0]
subruns = _update_subruns_in_chunk(chunks)
subruns = _merge_subruns_in_chunk(chunks)

prev_end = 0
for c in chunks:
Expand Down Expand Up @@ -402,9 +407,12 @@ def transform_chunk_to_superrun_chunk(superrun_id, chunk):
)


def _update_subruns_in_chunk(chunks):
"""Updates list of subruns in a superrun chunk during concatenation Updates also their
start/ends too."""
def _merge_subruns_in_chunk(chunks):
"""Merge list of subruns in a superrun chunk during concatenation.

Updates also their start/ends too.

"""
subruns = None
for c_i, c in enumerate(chunks):
if not subruns:
Expand All @@ -422,6 +430,27 @@ def _update_subruns_in_chunk(chunks):
return subruns


def _split_subruns_in_chunk(subruns, t):
"""Split list of subruns in a superrun chunk during split.

Updates also their start/ends too.

"""
if not subruns:
return None, None
subruns_first_chunk = {}
subruns_second_chunk = {}
for subrun_id, subrun_start_end in subruns.items():
if t < subrun_start_end["start"]:
subruns_second_chunk[subrun_id] = subrun_start_end
elif subrun_start_end["start"] <= t < subrun_start_end["end"]:
subruns_first_chunk[subrun_id] = {"start": subrun_start_end["start"], "end": int(t)}
subruns_second_chunk[subrun_id] = {"start": int(t), "end": subrun_start_end["end"]}
elif subrun_start_end["end"] <= t:
subruns_first_chunk[subrun_id] = subrun_start_end
return subruns_first_chunk, subruns_second_chunk


@export
class Rechunker:
"""Helper class for rechunking.
Expand All @@ -439,25 +468,55 @@ def __init__(self, rechunk=False, run_id=None):

self.cache = None

def receive(self, chunk):
def receive(self, chunk) -> list:
"""Receive a chunk, return list of chunks to send out after merging and splitting."""
if self.is_superrun:
chunk = strax.transform_chunk_to_superrun_chunk(self.run_id, chunk)
if not self.rechunk:
# We aren't rechunking
return chunk
return [chunk]

if self.cache is not None:
# We have an old chunk, so we need to concatenate
# We do not expect after concatenation that the chunk will be very large because
# the self.cache is already after splitting according to the target size
chunk = strax.Chunk.concatenate([self.cache, chunk])
if chunk.data.nbytes >= chunk.target_size_mb * 1e6:
# Enough data to send a new chunk!
self.cache = None
return chunk

target_size_b = chunk.target_size_mb * 1e6

# Get the split indices according to the allowed minimum gaps
# between data and the target size of chunk
split_indices = self.get_splits(chunk.data, target_size_b, DEFAULT_CHUNK_SPLIT_NS)
# Split the cache into chunks and return list of chunks
chunks = []
for index in split_indices:
_chunk, chunk = chunk.split(
t=chunk.data["time"][index] - int(DEFAULT_CHUNK_SPLIT_NS // 2),
allow_early_split=False,
)
chunks.append(_chunk)
self.cache = chunk
return chunks

def flush(self) -> list:
"""Flush the cache and return the remaining chunk in a list."""
if self.cache is None:
return []
else:
# Not enough data yet, so we cache the chunk
self.cache = chunk
return None
result = self.cache
self.cache = None
return [result]

def flush(self):
result = self.cache
self.cache = None
return result
@staticmethod
def get_splits(data, target_size, min_gap=DEFAULT_CHUNK_SPLIT_NS):
"""Get indices where to split the data into chunks of approximately target_size."""
assumed_i = int(target_size // data.itemsize)
gap_indices = np.argwhere(strax.diff(data) > min_gap).flatten() + 1
split_indices = [0]
if len(gap_indices) != 0:
while split_indices[-1] + assumed_i < gap_indices[-1]:
split_indices.append(
gap_indices[np.abs(gap_indices - assumed_i - split_indices[-1]).argmin()]
)
split_indices = np.diff(split_indices)
return split_indices
2 changes: 1 addition & 1 deletion strax/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def validate(
if self.name in config:
value = config[self.name]
if self.type is not OMITTED and not isinstance(value, self.type):
# TODO replace back with InvalidConfiguration
# TODO: replace back with InvalidConfiguration
UserWarning(
f"Invalid type for option {self.name}. "
f"Excepted a {self.type}, got a {type(value)}"
Expand Down
4 changes: 2 additions & 2 deletions strax/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,7 +2284,7 @@ def _check_copy_to_frontend_kwargs(
f"Cannot select {target_frontend_id}-th frontend as "
f"we only have {len(self.storage)} frontends!"
)
if rechunk and rechunk_to_mb == strax.default_chunk_size_mb:
if rechunk and rechunk_to_mb == strax.DEFAULT_CHUNK_SIZE_MB:
self.log.warning("No <rechunk_to_mb> specified!")

def copy_to_frontend(
Expand All @@ -2294,7 +2294,7 @@ def copy_to_frontend(
target_frontend_id: ty.Optional[int] = None,
target_compressor: ty.Optional[str] = None,
rechunk: bool = False,
rechunk_to_mb: int = strax.default_chunk_size_mb,
rechunk_to_mb: int = strax.DEFAULT_CHUNK_SIZE_MB,
):
"""Copy data from one frontend to another.

Expand Down
2 changes: 1 addition & 1 deletion strax/plugins/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Plugin:
rechunk_on_save = True # Saver is allowed to rechunk
# How large (uncompressed) should re-chunked chunks be?
# Meaningless if rechunk_on_save is False
chunk_target_size_mb = strax.default_chunk_size_mb
chunk_target_size_mb = strax.DEFAULT_CHUNK_SIZE_MB

# For a source with online input (e.g. DAQ readers), crash if no new input
# has appeared for this many seconds
Expand Down
15 changes: 15 additions & 0 deletions strax/processing/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ def _overload_endtime(x):
return lambda x: x["time"] + x["length"] * x["dt"]


@export
@numba.jit(nopython=True, nogil=True, cache=True)
def diff(data):
"""Return time differences between items in data."""
# we are sure that time is np.int64
if len(data) == 0:
return np.zeros(0, dtype=np.int64)
results = np.zeros(len(data) - 1, dtype=np.int64)
max_endtime = strax.endtime(data[0])
for i, (time, endtime) in enumerate(zip(data["time"][1:], strax.endtime(data)[:-1])):
max_endtime = max(max_endtime, endtime)
results[i] = time - max_endtime
return results


@export
@numba.jit(nopython=True, nogil=True, cache=True)
def from_break(x, safe_break, not_before=0, left=True, tolerant=False):
Expand Down
4 changes: 2 additions & 2 deletions strax/processors/post_office.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ def _register_topic(self, topic: str):
self._readers_done[topic] = []

def register_spy(self, spy: Spy, topic: str):
"""Register spy to recieve all messages on topic.
"""Register spy to receive all messages on topic.

spy.recieve(msg) will be called for each message, and spy.close() when the topic is
spy.receive(msg) will be called for each message, and spy.close() when the topic is
exhausted.

"""
Expand Down
6 changes: 4 additions & 2 deletions strax/processors/single_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def __init__(self, saver, rechunk=False):
def receive(self, chunk):
self._save_chunk(self.rechunker.receive(chunk))

def _save_chunk(self, chunk):
if chunk is not None:
def _save_chunk(self, chunks):
for chunk in chunks:
if chunk is None:
continue
self.saver.save(chunk, self.chunk_number)
self.chunk_number += 1

Expand Down
26 changes: 11 additions & 15 deletions strax/storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ def loader(self, backend_key, time_range=None, chunk_number=None, executor=None)
data_type=metadata["data_type"],
data_kind=metadata["data_kind"],
dtype=dtype,
target_size_mb=metadata.get("chunk_target_size_mb", strax.default_chunk_size_mb),
target_size_mb=metadata.get("chunk_target_size_mb", strax.DEFAULT_CHUNK_SIZE_MB),
)

required_chunk_metadata_fields = "start end run_id".split()
Expand Down Expand Up @@ -571,7 +571,7 @@ def _read_and_format_chunk(

def saver(self, key, metadata, **kwargs):
"""Return saver for data described by key."""
metadata.setdefault("compressor", "blosc") # TODO wrong place?
metadata.setdefault("compressor", "blosc") # TODO: wrong place?
metadata["strax_version"] = strax.__version__
if "dtype" in metadata:
metadata["dtype"] = metadata["dtype"].descr.__repr__()
Expand Down Expand Up @@ -648,22 +648,18 @@ def save_from(self, source: typing.Generator, rechunk=True, executor=None):

try:
while not exhausted:
chunk = None

try:
chunk = rechunker.receive(next(source))
chunks = rechunker.receive(next(source))
except StopIteration:
exhausted = True
chunk = rechunker.flush()

if chunk is None:
continue

new_f = self.save(chunk=chunk, chunk_i=chunk_i, executor=executor)
pending = [f for f in pending if not f.done()]
if new_f is not None:
pending += [new_f]
chunk_i += 1
chunks = rechunker.flush()

for chunk in chunks:
new_f = self.save(chunk=chunk, chunk_i=chunk_i, executor=executor)
pending = [f for f in pending if not f.done()]
if new_f is not None:
pending += [new_f]
chunk_i += 1

except strax.MailboxKilled:
# Write exception (with close), but exit gracefully.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def _rechunking(self, compressor, parallel=False, replace=False):
dest_directory=target_path.name if not replace else None,
replace=True,
compressor=compressor,
target_size_mb=strax.default_chunk_size_mb * 2,
target_size_mb=strax.DEFAULT_CHUNK_SIZE_MB * 2,
parallel=parallel,
max_workers=4,
_timeout=5,
Expand Down
25 changes: 11 additions & 14 deletions tests/test_superruns.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import os
import unittest
import shutil
import strax
import numpy as np
import tempfile
from strax.testutils import Records, Peaks, PeakClassification
import datetime
import pytz
import re
import json
from bson import json_util
import tempfile
import shutil
import pytz
import datetime

import unittest
import numpy as np
import pandas as pd
import re

import strax
from strax.testutils import Records, Peaks, PeakClassification


class TestSuperRuns(unittest.TestCase):
Expand Down Expand Up @@ -229,11 +231,6 @@ def test_rechnunking_and_loading(self):
rr_superrun = self.context.get_array("_superrun_test_rechunking", "records")
rr_subruns = self.context.get_array(self.subrun_ids, "records")

chunks = [chunk for chunk in self.context.get_iter("_superrun_test_rechunking", "records")]
assert len(chunks) > 1, (
"Number of chunks should be larger 1. "
f"{chunks[0].target_size_mb, chunks[0].nbytes / 10**6}"
)
assert np.all(rr_superrun["time"] == rr_subruns["time"])

def test_superrun_triggers_subrun_processing(self):
Expand Down