From 5a3a6abe1a6680d011b5f3de9a0486b21c5963b0 Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Wed, 26 Jun 2019 15:12:28 +0200 Subject: [PATCH 1/9] Update HISTORY for the release --- HISTORY.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 03bccfed0..65ac07460 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,9 @@ +0.7.4 / 2019-06-26 +------------------- +- Fix availability checks (#194) +- Allow selection of runs by name (#192) +- Fix some context methods for multi-output plugins + 0.7.3 / 2019-06-17 ------------------- - Multiple outputs per plugin (#190) From a8ff3c7f7bf10737e7a952613c49196f0fb9465c Mon Sep 17 00:00:00 2001 From: JelleAalbers Date: Wed, 26 Jun 2019 15:12:41 +0200 Subject: [PATCH 2/9] =?UTF-8?q?Bump=20version:=200.7.3=20=E2=86=92=200.7.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- docs/source/conf.py | 4 ++-- setup.py | 2 +- strax/__init__.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 24f9d9045..a623bf396 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.7.3 +current_version = 0.7.4 files = setup.py strax/__init__.py docs/source/conf.py commit = True tag = True diff --git a/docs/source/conf.py b/docs/source/conf.py index d68b65a5d..c17e7d081 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -65,9 +65,9 @@ # built documents. # # The short X.Y version. -version = '0.7.3' +version = '0.7.4' # The full version, including alpha/beta/rc tags. -release = '0.7.3' +release = '0.7.4' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index d85d78d5a..d0b0ebfc3 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ history = file.read() setuptools.setup(name='strax', - version='0.7.3', + version='0.7.4', description='Streaming analysis for xenon TPCs', author='Jelle Aalbers', url='https://github.com/AxFoundation/strax', diff --git a/strax/__init__.py b/strax/__init__.py index 3b1691976..a7d60261c 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -1,5 +1,5 @@ # flake8: noqa -__version__ = '0.7.3' +__version__ = '0.7.4' # Glue the package together # See https://www.youtube.com/watch?v=0oTh1CXRaQ0 if this confuses you From 86916e8ec48bfebe1e279cb0f0db79f2ce9d4127 Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Fri, 28 Jun 2019 18:32:05 +0200 Subject: [PATCH 3/9] Extensible context, run selection to separate file --- strax/__init__.py | 4 + strax/context.py | 292 +---------------------------------------- strax/run_selection.py | 263 +++++++++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 288 deletions(-) create mode 100644 strax/run_selection.py diff --git a/strax/__init__.py b/strax/__init__.py index a7d60261c..48b4804a7 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -21,6 +21,10 @@ from .processor import * from .context import * +# Just run this file, it will add new methods to Context +from . import run_selection +del run_selection + from .io import * from strax.processing.data_reduction import * diff --git a/strax/context.py b/strax/context.py index 2b54300d0..10062e361 100644 --- a/strax/context.py +++ b/strax/context.py @@ -820,227 +820,10 @@ def is_stored(self, run_id, target, **kwargs): continue return False - def list_available(self, target, **kwargs): - """Return sorted list of run_id's for which target is available - """ - # TODO duplicated code with with get_iter - if len(kwargs): - # noinspection PyMethodFirstArgAssignment - self = self.new_context(**kwargs) - - if self.runs is None: - self.scan_runs() - - keys = set([ - self.key_for(run_id, target) - for run_id in self.runs['name'].values]) - - found = set() - for sf in self.storage: - remaining = keys - found - is_found = sf.find_several(list(remaining), **self._find_options) - found |= set([k for i, k in enumerate(remaining) - if is_found[i]]) - return list(sorted([x.run_id for x in found])) - - def scan_runs(self, - check_available=tuple(), - store_fields=tuple()): - """Update and return self.runs with runs currently available - in all storage frontends. - :param check_available: Check whether these data types are available - Availability of xxx is stored as a boolean in the xxx_available - column. - :param store_fields: Additional fields from run doc to include - as rows in the dataframe. - - The context options scan_availability and store_run_fields list - data types and run fields, respectively, that will always be scanned. - """ - store_fields = tuple(set( - list(strax.to_str_tuple(store_fields)) - + ['name', 'number', 'tags', 'mode'] - + list(self.context_config['store_run_fields']))) - check_available = tuple(set( - list(strax.to_str_tuple(check_available)) - + list(self.context_config['check_available']))) - - docs = None - for sf in self.storage: - _temp_docs = [] - for doc in sf._scan_runs(store_fields=store_fields): - # If there is no number, make one from the name - if 'number' not in doc: - if 'name' not in doc: - raise ValueError(f"Invalid run doc {doc}, contains " - f"neither name nor number.") - doc['number'] = int(doc['name']) - - # If there is no name, make one from the number - doc.setdefault('name', str(doc['number'])) - - doc.setdefault('mode', '') - - # Flatten the tags field, if it exists - doc['tags'] = ','.join([t['name'] - for t in doc.get('tags', [])]) - - # Flatten the rest of the doc (mainly in case the mode field - # is something deeply nested) - doc = strax.flatten_dict(doc, separator='.') - - _temp_docs.append(doc) - - if len(_temp_docs): - new_docs = pd.DataFrame(_temp_docs) - else: - new_docs = pd.DataFrame([], columns=store_fields) - - if docs is None: - docs = new_docs - else: - # Keep only new runs (not found by earlier frontends) - docs = pd.concat([ - docs, - new_docs[ - ~np.in1d(new_docs['name'], docs['name'])]], - sort=False) - - self.runs = docs - - for d in tqdm(check_available, - desc='Checking data availability'): - self.runs[d + '_available'] = np.in1d( - self.runs.name.values, - self.list_available(d)) - - return self.runs - - def select_runs(self, run_mode=None, run_id=None, - include_tags=None, exclude_tags=None, - available=tuple(), - pattern_type='fnmatch', ignore_underscore=True): - """Return pandas.DataFrame with basic info from runs - that match selection criteria. - :param run_mode: Pattern to match run modes (reader.ini.name) - :param run_id: Pattern to match a run_id or run_ids - :param available: str or tuple of strs of data types for which data - must be available according to the runs DB. - - :param include_tags: String or list of strings of patterns - for required tags - :param exclude_tags: String / list of strings of patterns - for forbidden tags. - Exclusion criteria have higher priority than inclusion criteria. - :param pattern_type: Type of pattern matching to use. - Defaults to 'fnmatch', which means you can use - unix shell-style wildcards (`?`, `*`). - The alternative is 're', which means you can use - full python regular expressions. - :param ignore_underscore: Ignore the underscore at the start of tags - (indicating some degree of officialness or automation). - - Examples: - - `run_selection(include_tags='blinded')` - select all datasets with a blinded or _blinded tag. - - `run_selection(include_tags='*blinded')` - ... with blinded or _blinded, unblinded, blablinded, etc. - - `run_selection(include_tags=['blinded', 'unblinded'])` - ... with blinded OR unblinded, but not blablinded. - - `run_selection(include_tags='blinded', - exclude_tags=['bad', 'messy'])` - select blinded dsatasets that aren't bad or messy - """ - if self.runs is None: - self.scan_runs() - dsets = self.runs.copy() - - if pattern_type not in ('re', 'fnmatch'): - raise ValueError("Pattern type must be 're' or 'fnmatch'") - - # Filter datasets by run mode and/or name - for field_name, requested_value in ( - ('name', run_id), - ('mode', run_mode)): - - if requested_value is None: - continue - - values = dsets[field_name].values - mask = np.zeros(len(values), dtype=np.bool_) - - if pattern_type == 'fnmatch': - for i, x in enumerate(values): - mask[i] = fnmatch.fnmatch(x, requested_value) - elif pattern_type == 're': - for i, x in enumerate(values): - mask[i] = bool(re.match(requested_value, x)) - - dsets = dsets[mask] - - - if include_tags is not None: - dsets = dsets[_tags_match(dsets, - include_tags, - pattern_type, - ignore_underscore)] - - if exclude_tags is not None: - dsets = dsets[True ^ _tags_match(dsets, - exclude_tags, - pattern_type, - ignore_underscore)] - - have_available = strax.to_str_tuple(available) - for d in have_available: - if not d + '_available' in dsets.columns: - # Get extra availability info from the run db - self.runs[d + '_available'] = np.in1d( - self.runs.name.values, - self.list_available(d)) - dsets = dsets[dsets[d + '_available']] - - return dsets - - def define_run(self, - name: str, - data: ty.Union[np.ndarray, pd.DataFrame, dict], - from_run: ty.Union[str, None] = None): - - if isinstance(data, (pd.DataFrame, np.ndarray)): - # Array of events / regions of interest - start, end = data['time'], strax.endtime(data) - if from_run is not None: - return self.define_run( - name, - {from_run: np.transpose([start, end])}) - else: - df = pd.DataFrame(dict(starts=start, ends=end, - run_id=data['run_id'])) - self.define_run( - name, - {run_id: rs[['start', 'stop']].values.transpose() - for run_id, rs in df.groupby('fromrun')}) - - if isinstance(data, (list, tuple)): - # list of runids - data = strax.to_str_tuple(data) - self.define_run( - name, - {run_id: 'all' for run_id in data}) - - if not isinstance(data, dict): - raise ValueError("Can't define run from {type(data)}") - - # Dict mapping run_id: array of time ranges or all - for sf in self.storage: - if not sf.readonly and sf.can_define_runs: - sf.define_run(name, data) - break - else: - raise RuntimeError("No storage frontend registered that allows" - " run definition") - + @classmethod + def add_method(cls, f): + """Add f as a new Context method""" + setattr(cls, f.__name__, f) get_docs = """ @@ -1063,70 +846,3 @@ def define_run(self, doc = attr_val.__doc__ if doc is not None and '{get_docs}' in doc: attr_val.__doc__ = doc.format(get_docs=get_docs) - - -def _tags_match(dsets, patterns, pattern_type, ignore_underscore): - result = np.zeros(len(dsets), dtype=np.bool) - - if isinstance(patterns, str): - patterns = [patterns] - - for i, tags in enumerate(dsets.tags): - result[i] = any([any([_tag_match(tag, pattern, - pattern_type, - ignore_underscore) - for tag in tags.split(',') - for pattern in patterns])]) - - return result - - -def _tag_match(tag, pattern, pattern_type, ignore_underscore): - if ignore_underscore and tag.startswith('_'): - tag = tag[1:] - if pattern_type == 'fnmatch': - return fnmatch.fnmatch(tag, pattern) - elif pattern_type == 're': - return bool(re.match(pattern, tag)) - raise NotImplementedError - - -@export -def multi_run(f, run_ids, *args, max_workers=None, **kwargs): - """Execute f(run_id, **kwargs) over multiple runs, - then return list of results. - - :param run_ids: list/tuple of runids - :param max_workers: number of worker threads/processes to spawn - - Other (kw)args will be passed to f - """ - # Try to int all run_ids - - # Get a numpy array of run ids. - try: - run_id_numpy = np.array([int(x) for x in run_ids], - dtype=np.int32) - except ValueError: - # If there are string id's among them, - # numpy will autocast all the run ids to Unicode fixed-width - run_id_numpy = np.array(run_ids) - - # Probably we'll want to use dask for this in the future, - # to enable cut history tracking and multiprocessing. - # For some reason the ProcessPoolExecutor doesn't work?? - with ThreadPoolExecutor(max_workers=max_workers) as exc: - futures = [exc.submit(f, r, *args, **kwargs) - for r in run_ids] - for _ in tqdm(as_completed(futures), - desc="Loading %d runs" % len(run_ids)): - pass - - result = [] - for i, f in enumerate(futures): - r = f.result() - ids = np.array([run_id_numpy[i]] * len(r), - dtype=[('run_id', run_id_numpy.dtype)]) - r = strax.merge_arrs([ids, r]) - result.append(r) - return result diff --git a/strax/run_selection.py b/strax/run_selection.py new file mode 100644 index 000000000..36e601af1 --- /dev/null +++ b/strax/run_selection.py @@ -0,0 +1,263 @@ +"""Context methods dealing with run scanning and selection""" +import fnmatch +import re +import typing as ty + +import numpy as np +import pandas as pd +from tqdm import tqdm + +import strax + + +@strax.Context.add_method +def list_available(self, target, **kwargs): + """Return sorted list of run_id's for which target is available + """ + # TODO duplicated code with with get_iter + if len(kwargs): + # noinspection PyMethodFirstArgAssignment + self = self.new_context(**kwargs) + + if self.runs is None: + self.scan_runs() + + keys = set([ + self.key_for(run_id, target) + for run_id in self.runs['name'].values]) + + found = set() + for sf in self.storage: + remaining = keys - found + is_found = sf.find_several(list(remaining), **self._find_options) + found |= set([k for i, k in enumerate(remaining) + if is_found[i]]) + return list(sorted([x.run_id for x in found])) + + +@strax.Context.add_method +def scan_runs(self, + check_available=tuple(), + store_fields=tuple()): + """Update and return self.runs with runs currently available + in all storage frontends. + :param check_available: Check whether these data types are available + Availability of xxx is stored as a boolean in the xxx_available + column. + :param store_fields: Additional fields from run doc to include + as rows in the dataframe. + + The context options scan_availability and store_run_fields list + data types and run fields, respectively, that will always be scanned. + """ + store_fields = tuple(set( + list(strax.to_str_tuple(store_fields)) + + ['name', 'number', 'tags', 'mode'] + + list(self.context_config['store_run_fields']))) + check_available = tuple(set( + list(strax.to_str_tuple(check_available)) + + list(self.context_config['check_available']))) + + docs = None + for sf in self.storage: + _temp_docs = [] + for doc in sf._scan_runs(store_fields=store_fields): + # If there is no number, make one from the name + if 'number' not in doc: + if 'name' not in doc: + raise ValueError(f"Invalid run doc {doc}, contains " + f"neither name nor number.") + doc['number'] = int(doc['name']) + + # If there is no name, make one from the number + doc.setdefault('name', str(doc['number'])) + + doc.setdefault('mode', '') + + # Flatten the tags field, if it exists + doc['tags'] = ','.join([t['name'] + for t in doc.get('tags', [])]) + + # Flatten the rest of the doc (mainly in case the mode field + # is something deeply nested) + doc = strax.flatten_dict(doc, separator='.') + + _temp_docs.append(doc) + + if len(_temp_docs): + new_docs = pd.DataFrame(_temp_docs) + else: + new_docs = pd.DataFrame([], columns=store_fields) + + if docs is None: + docs = new_docs + else: + # Keep only new runs (not found by earlier frontends) + docs = pd.concat([ + docs, + new_docs[ + ~np.in1d(new_docs['name'], docs['name'])]], + sort=False) + + self.runs = docs + + for d in tqdm(check_available, + desc='Checking data availability'): + self.runs[d + '_available'] = np.in1d( + self.runs.name.values, + self.list_available(d)) + + return self.runs + + +@strax.Context.add_method +def select_runs(self, run_mode=None, run_id=None, + include_tags=None, exclude_tags=None, + available=tuple(), + pattern_type='fnmatch', ignore_underscore=True): + """Return pandas.DataFrame with basic info from runs + that match selection criteria. + :param run_mode: Pattern to match run modes (reader.ini.name) + :param run_id: Pattern to match a run_id or run_ids + :param available: str or tuple of strs of data types for which data + must be available according to the runs DB. + + :param include_tags: String or list of strings of patterns + for required tags + :param exclude_tags: String / list of strings of patterns + for forbidden tags. + Exclusion criteria have higher priority than inclusion criteria. + :param pattern_type: Type of pattern matching to use. + Defaults to 'fnmatch', which means you can use + unix shell-style wildcards (`?`, `*`). + The alternative is 're', which means you can use + full python regular expressions. + :param ignore_underscore: Ignore the underscore at the start of tags + (indicating some degree of officialness or automation). + + Examples: + - `run_selection(include_tags='blinded')` + select all datasets with a blinded or _blinded tag. + - `run_selection(include_tags='*blinded')` + ... with blinded or _blinded, unblinded, blablinded, etc. + - `run_selection(include_tags=['blinded', 'unblinded'])` + ... with blinded OR unblinded, but not blablinded. + - `run_selection(include_tags='blinded', + exclude_tags=['bad', 'messy'])` + select blinded dsatasets that aren't bad or messy + """ + if self.runs is None: + self.scan_runs() + dsets = self.runs.copy() + + if pattern_type not in ('re', 'fnmatch'): + raise ValueError("Pattern type must be 're' or 'fnmatch'") + + # Filter datasets by run mode and/or name + for field_name, requested_value in ( + ('name', run_id), + ('mode', run_mode)): + + if requested_value is None: + continue + + values = dsets[field_name].values + mask = np.zeros(len(values), dtype=np.bool_) + + if pattern_type == 'fnmatch': + for i, x in enumerate(values): + mask[i] = fnmatch.fnmatch(x, requested_value) + elif pattern_type == 're': + for i, x in enumerate(values): + mask[i] = bool(re.match(requested_value, x)) + + dsets = dsets[mask] + + if include_tags is not None: + dsets = dsets[_tags_match(dsets, + include_tags, + pattern_type, + ignore_underscore)] + + if exclude_tags is not None: + dsets = dsets[True ^ _tags_match(dsets, + exclude_tags, + pattern_type, + ignore_underscore)] + + have_available = strax.to_str_tuple(available) + for d in have_available: + if not d + '_available' in dsets.columns: + # Get extra availability info from the run db + self.runs[d + '_available'] = np.in1d( + self.runs.name.values, + self.list_available(d)) + dsets = dsets[dsets[d + '_available']] + + return dsets + + +@strax.Context.add_method +def define_run(self, + name: str, + data: ty.Union[np.ndarray, pd.DataFrame, dict], + from_run: ty.Union[str, None] = None): + if isinstance(data, (pd.DataFrame, np.ndarray)): + # Array of events / regions of interest + start, end = data['time'], strax.endtime(data) + if from_run is not None: + return self.define_run( + name, + {from_run: np.transpose([start, end])}) + else: + df = pd.DataFrame(dict(starts=start, ends=end, + run_id=data['run_id'])) + self.define_run( + name, + {run_id: rs[['start', 'stop']].values.transpose() + for run_id, rs in df.groupby('fromrun')}) + + if isinstance(data, (list, tuple)): + # list of runids + data = strax.to_str_tuple(data) + self.define_run( + name, + {run_id: 'all' for run_id in data}) + + if not isinstance(data, dict): + raise ValueError("Can't define run from {type(data)}") + + # Dict mapping run_id: array of time ranges or all + for sf in self.storage: + if not sf.readonly and sf.can_define_runs: + sf.define_run(name, data) + break + else: + raise RuntimeError("No storage frontend registered that allows" + " run definition") + + +def _tags_match(dsets, patterns, pattern_type, ignore_underscore): + result = np.zeros(len(dsets), dtype=np.bool) + + if isinstance(patterns, str): + patterns = [patterns] + + for i, tags in enumerate(dsets.tags): + result[i] = any([any([_tag_match(tag, pattern, + pattern_type, + ignore_underscore) + for tag in tags.split(',') + for pattern in patterns])]) + + return result + + +def _tag_match(tag, pattern, pattern_type, ignore_underscore): + if ignore_underscore and tag.startswith('_'): + tag = tag[1:] + if pattern_type == 'fnmatch': + return fnmatch.fnmatch(tag, pattern) + elif pattern_type == 're': + return bool(re.match(pattern, tag)) + raise NotImplementedError From 3f508c34083d096ffda279f3d39ef2e2e0bd04f1 Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Sun, 30 Jun 2019 13:50:33 +0200 Subject: [PATCH 4/9] group_by_kind and multi_run to utils --- strax/context.py | 14 +++---- strax/plugin.py | 71 +++++++++++++++++---------------- strax/utils.py | 100 ++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 137 insertions(+), 48 deletions(-) diff --git a/strax/context.py b/strax/context.py index 10062e361..75bdfd4ca 100644 --- a/strax/context.py +++ b/strax/context.py @@ -1,10 +1,8 @@ -from concurrent.futures import ThreadPoolExecutor, as_completed import collections import logging import fnmatch from functools import partial import random -import re import string import typing as ty import warnings @@ -12,12 +10,10 @@ import numexpr import numpy as np import pandas as pd -from tqdm import tqdm import strax export, __all__ = strax.exporter() - @strax.takes_config( strax.Option(name='storage_converter', default=False, help='If True, save data that is loaded from one frontend ' @@ -713,8 +709,9 @@ def make(self, run_id: ty.Union[str, tuple, list], # Multi-run support run_ids = strax.to_str_tuple(run_id) if len(run_ids) > 1: - return multi_run(self, run_ids, targets=targets, - save=save, max_workers=max_workers, **kwargs) + return strax.multi_run( + self.make, run_ids, targets=targets, + save=save, max_workers=max_workers, **kwargs) for _ in self.get_iter(run_ids[0], targets, save=save, max_workers=max_workers, **kwargs): @@ -728,8 +725,9 @@ def get_array(self, run_id: ty.Union[str, tuple, list], """ run_ids = strax.to_str_tuple(run_id) if len(run_ids) > 1: - results = multi_run(self.get_array, run_ids, targets=targets, - save=save, max_workers=max_workers, **kwargs) + results = strax.multi_run( + self.get_array, run_ids, targets=targets, + save=save, max_workers=max_workers, **kwargs) else: results = list(self.get_iter(run_ids[0], targets, save=save, max_workers=max_workers, diff --git a/strax/plugin.py b/strax/plugin.py index 8d7ba9b71..f2b971ddb 100644 --- a/strax/plugin.py +++ b/strax/plugin.py @@ -139,40 +139,43 @@ def metadata(self, run_id, data_type): lineage=self.lineage) def dependencies_by_kind(self, require_time=None): - """Return dependencies grouped by data kind - i.e. {kind1: [dep0, dep1], kind2: [dep, dep]} - :param require_time: If True, one dependency of each kind - must provide time information. It will be put first in the list. - - If require_time is omitted, we will require time only if there is - more than one data kind in the dependencies. - """ - if require_time is None: - require_time = \ - len(self.dependencies_by_kind(require_time=False)) > 1 - - deps_by_kind = dict() - key_deps = [] - for d in self.depends_on: - k = self.deps[d].data_kind_for(d) - deps_by_kind.setdefault(k, []) - - # If this has time information, put it first in the list - if (require_time - and 'time' in self.deps[d].dtype.names): - key_deps.append(d) - deps_by_kind[k].insert(0, d) - else: - deps_by_kind[k].append(d) - - if require_time: - for k, d in deps_by_kind.items(): - if not d[0] in key_deps: - raise ValueError(f"For {self.__class__.__name__}, no " - f"dependency of data kind {k} " - "has time information!") - - return deps_by_kind + return strax.group_by_kind( + self.depends_on, + plugins=self.deps, + require_time=require_time) + # """Return dependencies grouped by data kind + # i.e. {kind1: [dep0, dep1], kind2: [dep, dep]} + # :param require_time: If True, one dependency of each kind + # must provide time information. It will be put first in the list. + # + # If require_time is omitted, we will require time only if there is + # more than one data kind in the dependencies. + # """ + # if require_time is None: + # require_time = \ + # len(self.dependencies_by_kind(require_time=False)) > 1 + # + # deps_by_kind = dict() + # key_deps = [] + # for d in self.depends_on: + # k = self.deps[d].data_kind_for(d) + # deps_by_kind.setdefault(k, []) + # + # # If this has time information, put it first in the list + # if (require_time + # and 'time' in self.deps[d].dtype.names): + # key_deps.append(d) + # deps_by_kind[k].insert(0, d) + # else: + # deps_by_kind[k].append(d) + # + # if require_time: + # for k, d in deps_by_kind.items(): + # if not d[0] in key_deps: + # raise ValueError(f"For {self.__class__.__name__}, no " + # f"dependency of data kind {k} " + # "has time information!") + #return deps_by_kind def is_ready(self, chunk_i): """Return whether the chunk chunk_i is ready for reading. diff --git a/strax/utils.py b/strax/utils.py index c614f7c77..77ea6da9e 100644 --- a/strax/utils.py +++ b/strax/utils.py @@ -1,19 +1,20 @@ from base64 import b32encode import collections +from concurrent.futures import ThreadPoolExecutor, as_completed import contextlib from functools import wraps import json import re import sys import traceback -import typing +import typing as ty from hashlib import sha1 +import dill +import numba import numpy as np +from tqdm import tqdm import pandas as pd -import numba -import dill - # Change numba's caching backend from pickle to dill # I'm sure they don't mind... @@ -203,7 +204,7 @@ def profile_threaded(filename): @export -def to_str_tuple(x) -> typing.Tuple[str]: +def to_str_tuple(x) -> ty.Tuple[str]: if isinstance(x, str): return x, elif isinstance(x, list): @@ -372,4 +373,91 @@ def dict_to_rec(x, dtype=None): r = np.zeros(n, dtype=dtype) for k, v in x.items(): r[k] = v - return r \ No newline at end of file + return r + + +@export +def multi_run(f, run_ids, *args, max_workers=None, **kwargs): + """Execute f(run_id, **kwargs) over multiple runs, + then return list of results. + + :param run_ids: list/tuple of runids + :param max_workers: number of worker threads/processes to spawn + + Other (kw)args will be passed to f + """ + # Try to int all run_ids + + # Get a numpy array of run ids. + try: + run_id_numpy = np.array([int(x) for x in run_ids], + dtype=np.int32) + except ValueError: + # If there are string id's among them, + # numpy will autocast all the run ids to Unicode fixed-width + run_id_numpy = np.array(run_ids) + + # Probably we'll want to use dask for this in the future, + # to enable cut history tracking and multiprocessing. + # For some reason the ProcessPoolExecutor doesn't work?? + with ThreadPoolExecutor(max_workers=max_workers) as exc: + futures = [exc.submit(f, r, *args, **kwargs) + for r in run_ids] + for _ in tqdm(as_completed(futures), + desc="Loading %d runs" % len(run_ids)): + pass + + result = [] + for i, f in enumerate(futures): + r = f.result() + ids = np.array([run_id_numpy[i]] * len(r), + dtype=[('run_id', run_id_numpy.dtype)]) + r = merge_arrs([ids, r]) + result.append(r) + return result + + +@export +def group_by_kind(dtypes, plugins=None, context=None, + require_time=None) -> ty.Dict[str, ty.List]: + """Return dtypes grouped by data kind + i.e. {kind1: [d, d, ...], kind2: [d, d, ...], ...} + :param plugins: plugins providing the dtypes. + :param context: context to get plugins from if not given. + :param require_time: If True, one data type of each kind + must provide time information. It will be put first in the list. + + If require_time is None (default), we will require time only if there + is more than one data kind in dtypes. + """ + if plugins is None: + if context is None: + raise RuntimeError("group_by_kind requires plugins or context") + plugins = context._get_plugins(targets=dtypes, run_id='0') + + if require_time is None: + require_time = len(group_by_kind( + dtypes, plugins=plugins, context=context, require_time=False)) > 1 + + deps_by_kind = dict() + key_deps = [] + for d in dtypes: + p = plugins[d] + k = p.data_kind_for(d) + deps_by_kind.setdefault(k, []) + + # If this has time information, put it first in the list + if (require_time + and 'time' in p.dtype_for(d).names): + key_deps.append(d) + deps_by_kind[k].insert(0, d) + else: + deps_by_kind[k].append(d) + + if require_time: + for k, d in deps_by_kind.items(): + if not d[0] in key_deps: + raise ValueError(f"No dependency of data kind {k} " + "has time information!") + + return deps_by_kind From 7c0b9f3aa9352f667e38ef81d5e487a9c6a89178 Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Sun, 30 Jun 2019 13:51:16 +0200 Subject: [PATCH 5/9] Fix dict_to_rec if dtype not provided --- strax/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/strax/utils.py b/strax/utils.py index 77ea6da9e..23b15170b 100644 --- a/strax/utils.py +++ b/strax/utils.py @@ -362,8 +362,8 @@ def dict_to_rec(x, dtype=None): if dtype is None: if not len(x): raise ValueError("Cannot infer dtype from empty dict") - dtype = to_numpy_dtype([(k, v.dtype) - for k, v in x]) + dtype = to_numpy_dtype([(k, np.asarray(v).dtype) + for k, v in x.items()]) if not len(x): return np.empty(0, dtype=dtype) From 97300f5814d41bcb867d1ba3fa26fbc61c39a504 Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Sun, 30 Jun 2019 13:51:37 +0200 Subject: [PATCH 6/9] Make endtime work on dataframes --- strax/processing/general.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/strax/processing/general.py b/strax/processing/general.py index 015d15a85..1dcdc6c18 100644 --- a/strax/processing/general.py +++ b/strax/processing/general.py @@ -38,9 +38,10 @@ def first_index_not_below(arr, t): @export def endtime(x): """Return endtime of intervals x""" - if 'endtime' in x.dtype.names: + try: return x['endtime'] - return x['time'] + x['length'] * x['dt'] + except (KeyError, ValueError, IndexError): + return x['time'] + x['length'] * x['dt'] @export From 429fd7c393401ed0172178ed9d2721c812d45677 Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Sun, 30 Jun 2019 13:53:15 +0200 Subject: [PATCH 7/9] Refactor (time) selections, fix #181 --- strax/context.py | 182 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 132 insertions(+), 50 deletions(-) diff --git a/strax/context.py b/strax/context.py index 75bdfd4ca..7ad039ac4 100644 --- a/strax/context.py +++ b/strax/context.py @@ -431,15 +431,16 @@ def get_components(self, run_id: str, n_range = None if time_range is not None: # Ensure we have one data kind - if len(set([plugins[t].data_kind for t in targets])) > 1: + if len(set([plugins[t].data_kind_for(t) for t in targets])) > 1: raise NotImplementedError( "Time range selection not implemented " "for multiple data kinds.") # Which plugin provides time information? We need it to map to # row indices. - for p in targets: - if 'time' in plugins[p].dtype.names: + for d in targets: + if 'time' in plugins[d].dtype_for(d).names: + d_with_time = d break else: raise RuntimeError(f"No time info in targets, should have been" @@ -448,11 +449,29 @@ def get_components(self, run_id: str, # Find a range of row numbers that contains the time range # It's a bit too large: to # Get the n <-> time mapping in needed chunks - if not self.is_stored(run_id, p): - raise strax.DataNotAvailable(f"Time range selection needs time" - f" info from {p}, but this data" - f" is not yet available") - meta = self.get_meta(run_id, p) + if d_with_time.startswith('_temp'): + # This is a merge-only data type, which is never stored. + # Get the time info from one of its dependencies + deps_to_check = plugins[d_with_time].depends_on + for d in deps_to_check: + if (d in plugins + and 'time' in plugins[d].dtype_for(d).names): + d_with_time = d + break + else: + raise RuntimeError( + "Cannot use time range selection " + f"since none of the dependencies {deps_to_check} " + "of the MergeOnlyPlugin provide time information") + + d_with_time = plugins[d_with_time].depends_on[0] + + if not self.is_stored(run_id, d_with_time): + raise strax.DataNotAvailable( + "Time range selection needs time info from " + f"{d_with_time}, but this data is not yet available") + + meta = self.get_meta(run_id, d_with_time) times = np.array([c['first_time'] for c in meta['chunks']]) # Reconstruct row numbers from row counts, which are in metadata # n_end is last row + 1 in a chunk. n_start is the first. @@ -501,7 +520,7 @@ def check_cache(d): break else: # Data not found anywhere. We will be computing it. - if time_range is not None: + if time_range is not None and not d.startswith('_temp'): # While the data type providing the time information is # available (else we'd have failed earlier), one of the # other requested data types is not. @@ -604,20 +623,61 @@ def check_cache(d): # For the plugins which will run computations, # check all required options are available or set defaults. # Also run any user-defined setup - for p in plugins.values(): - self._set_plugin_config(p, run_id, tolerant=False) - p.setup() + for d in plugins.values(): + self._set_plugin_config(d, run_id, tolerant=False) + d.setup() return strax.ProcessorComponents( plugins=plugins, loaders=loaders, savers=dict(savers), targets=targets) + def estimate_run_start(self, run_id, targets): + """Return run start time in ns since epoch. + + This fetches from run metadata, and if this fails, it + estimates it using data metadata from targets. + """ + try: + # Use run metadata, if it is available, to get + # the run start time (floored to seconds) + t0 = self.run_metadata(run_id, 'start')['start'] + t0 = int(t0.timestamp()) * int(1e9) + except strax.RunMetadataNotAvailable: + if targets is None: + raise + # Get an approx start from the data itself, + # then floor it to seconds for consistency + t = strax.to_str_tuple(targets)[0] + # Get an approx start from the data itself, + # then floor it to seconds for consistency + t0 = self.get_meta(run_id, t)['chunks'][0]['first_time'] + t0 = (int(t0) // int(1e9)) * int(1e9) + return t0 + + def to_absolute_time_range(self, run_id, targets, time_range=None, + seconds_range=None, time_within=None): + if ((time_range is None) + + (seconds_range is None) + + (time_within is None) + < 2): + raise RuntimeError("Pass no more than one one of" + " time_range, seconds_range, ot time_within") + if seconds_range is not None: + t0 = self.estimate_run_start(run_id, targets) + time_range = (t0 + int(1e9 * seconds_range[0]), + t0 + int(1e9 * seconds_range[1])) + if time_within is not None: + time_range = (time_within['time'], strax.endtime(time_within)) + return time_range + def get_iter(self, run_id: str, targets, save=tuple(), max_workers=None, time_range=None, seconds_range=None, - selection=None, + time_within=None, + time_selection='fully_contained', + selection_str=None, **kwargs) -> ty.Iterator[np.ndarray]: """Compute target for run_id and iterate over results. @@ -631,27 +691,14 @@ def get_iter(self, run_id: str, # noinspection PyMethodFirstArgAssignment self = self.new_context(**kwargs) - if isinstance(selection, (list, tuple)): - selection = ' & '.join(f'({x})' for x in selection) + if isinstance(selection_str, (list, tuple)): + selection_str = ' & '.join(f'({x})' for x in selection_str) - # Convert relative to absolute time range - if seconds_range is not None: - try: - # Use run metadata, if it is available, to get - # the run start time (floored to seconds) - t0 = self.run_metadata(run_id, 'start')['start'] - t0 = int(t0.timestamp()) * int(1e9) - except Exception: - # Get an approx start from the data itself, - # then floor it to seconds for consistency - if isinstance(targets, (list, tuple)): - t = targets[0] - else: - t = targets - t0 = self.get_meta(run_id, t)['chunks'][0]['first_time'] - t0 = int(t0 / int(1e9)) * int(1e9) - time_range = (t0 + int(1e9) * seconds_range[0], - t0 + int(1e9) * seconds_range[1]) + # Convert alternate time arguments to absolute range + time_range = self.to_absolute_time_range( + run_id=run_id, targets=targets, + time_range=time_range, seconds_range=seconds_range, + time_within=time_within) # If multiple targets of the same kind, create a MergeOnlyPlugin # to merge the results automatically @@ -686,20 +733,46 @@ def get_iter(self, run_id: str, if not isinstance(x, np.ndarray): raise ValueError(f"Got type {type(x)} rather than numpy array " "from the processor!") - if selection is not None: - mask = numexpr.evaluate(selection, local_dict={ - fn: x[fn] - for fn in x.dtype.names}) - x = x[mask] - if time_range: - if 'time' not in x.dtype.names: - raise NotImplementedError( - "Time range selection requires time information, " - "but none of the required plugins provides it.") - x = x[(time_range[0] <= x['time']) & - (x['time'] < time_range[1])] + x = self.apply_selection(x, selection_str, + time_range, time_selection) yield x + def apply_selection(self, x, selection_str=None, + time_range=None, + time_selection='fully_contained'): + """Return x after applying selections + + :param selection_str: Query string or sequence of strings to apply. + :param time_range: (start, stop) range to load, in ns since the epoch + :param time_selection: Kind of time selectoin to apply: + - fully_contained: (default) select things fully contained in the range + - touching: select things that (partially) overlap with the range + - skip: Do not select a time range, even if other arguments say so + """ + # Apply the time selections + if time_range is None or time_selection == 'skip': + pass + elif 'time' not in x.dtype.names: + raise NotImplementedError( + "Time range selection requires time information, " + "but none of the required plugins provides it.") + elif time_selection == 'fully_contained': + return x[(time_range[0] <= x['time']) & + (strax.endtime(x) < time_range[1])] + elif time_selection == 'touching': + return x[(strax.endtime(x) > x['time']) & + (x['time'] < time_range[1])] + else: + raise ValueError(f"Unknown time_selection {time_selection}") + + if selection_str: + mask = numexpr.evaluate(selection_str, local_dict={ + fn: x[fn] + for fn in x.dtype.names}) + x = x[mask] + + return x + def make(self, run_id: ty.Union[str, tuple, list], targets, save=tuple(), max_workers=None, **kwargs) -> None: @@ -824,6 +897,18 @@ def add_method(cls, f): setattr(cls, f.__name__, f) +select_docs = """ +:param selection_str: Query string or sequence of strings to apply. +:param time_range: (start, stop) range to load, in ns since the epoch +:param seconds_range: (start, stop) range of seconds since +the start of the run to load. +:param time_within: row of strax data (e.g. event) to use as time range +:param time_selection: Kind of time selectoin to apply: +- fully_contained: (default) select things fully contained in the range +- touching: select things that (partially) overlap with the range +- skip: Do not select a time range, even if other arguments say so +""" + get_docs = """ :param run_id: run id to get :param targets: list/tuple of strings of data type names to get @@ -832,11 +917,8 @@ def add_method(cls, f): Many plugins save automatically anyway. :param max_workers: Number of worker threads/processes to spawn. In practice more CPUs may be used due to strax's multithreading. -:param selection: Query string or list of strings with selections to apply. -:param time_range: (start, stop) range of ns since the unix epoch to load -:param seconds_range: (start, stop) range of seconds since the start of the -run to load. -""" +""" + select_docs + for attr in dir(Context): attr_val = getattr(Context, attr) From 2736d283ad71f11cfadf4ef2696234c16ce2044e Mon Sep 17 00:00:00 2001 From: Jelle Aalbers Date: Sat, 6 Jul 2019 16:21:26 +0200 Subject: [PATCH 8/9] Update HISTORY for the release --- HISTORY.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 65ac07460..ebec87638 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,7 @@ +0.7.5 / 2019-07-06 +------------------ +- Time selection refactor and context extensibility (#195) + 0.7.4 / 2019-06-26 ------------------- - Fix availability checks (#194) From 61e07aa056f61db78359baca61f8506a7104adec Mon Sep 17 00:00:00 2001 From: JelleAalbers Date: Sat, 6 Jul 2019 16:22:01 +0200 Subject: [PATCH 9/9] =?UTF-8?q?Bump=20version:=200.7.4=20=E2=86=92=200.7.5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .bumpversion.cfg | 2 +- docs/source/conf.py | 4 ++-- setup.py | 2 +- strax/__init__.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index a623bf396..82866ce95 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.7.4 +current_version = 0.7.5 files = setup.py strax/__init__.py docs/source/conf.py commit = True tag = True diff --git a/docs/source/conf.py b/docs/source/conf.py index c17e7d081..9750a3568 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -65,9 +65,9 @@ # built documents. # # The short X.Y version. -version = '0.7.4' +version = '0.7.5' # The full version, including alpha/beta/rc tags. -release = '0.7.4' +release = '0.7.5' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/setup.py b/setup.py index d0b0ebfc3..126937cc4 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ history = file.read() setuptools.setup(name='strax', - version='0.7.4', + version='0.7.5', description='Streaming analysis for xenon TPCs', author='Jelle Aalbers', url='https://github.com/AxFoundation/strax', diff --git a/strax/__init__.py b/strax/__init__.py index 48b4804a7..bf1abf869 100644 --- a/strax/__init__.py +++ b/strax/__init__.py @@ -1,5 +1,5 @@ # flake8: noqa -__version__ = '0.7.4' +__version__ = '0.7.5' # Glue the package together # See https://www.youtube.com/watch?v=0oTh1CXRaQ0 if this confuses you