Skip to content

Commit

Permalink
MNT #502 enable v1 or v2 query per #521 review
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Jul 16, 2021
1 parent 0f15e7e commit ec8305a
Showing 1 changed file with 92 additions and 16 deletions.
108 changes: 92 additions & 16 deletions apstools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
~command_list_as_table
~connect_pvlist
~copy_filtered_catalog
~db_query
~device_read2table
~dictionary_table
~EmailNotifications
Expand Down Expand Up @@ -103,6 +104,9 @@

MAX_EPICS_STRINGOUT_LENGTH = 40

FIRST_DATA = "1995-01-01"
LAST_DATA = "2100-12-31"


class ExcelReadError(openpyxl.utils.exceptions.InvalidFileException):
"""
Expand Down Expand Up @@ -298,9 +302,9 @@ def getDatabase(db=None, catalog_name=None):
db
*object* :
Instance of databroker v1 ``Broker`` or
v2 ``BlueskyMongoCatalog``.
Bluesky database, an instance of ``databroker.catalog``
(default: see ``catalog_name`` keyword argument)
catalog_name
*str* :
Name of databroker v2 catalog, used when supplied
Expand Down Expand Up @@ -397,7 +401,56 @@ def getDefaultDatabase():
return sorted(choices)[-1]


def getRunData(scan_id, db=None, stream="primary", query=None):
def db_query(db, query):
"""
Searches the databroker v2 database.
PARAMETERS
db
*object* :
Bluesky database, an instance of ``databroker.catalog``.
query
*dict* :
Search parameters.
RETURNS
*object* :
Bluesky database, an instance of ``databroker.catalog``
satisfying the ``query`` parameters.
See also
--------
:func:`databroker.catalog.search`
"""

if query is None:
return db

since = query.pop("since", None)
until = query.pop("until", None)

if since or until:
if not since:
since = FIRST_DATA
if not until:
until = LAST_DATA

_db = db.v2.search(
databroker.queries.TimeRange(since=since, until=until)
)
else:
_db = db

if len(query) != 0:
_db = _db.v2.search(query)

return _db


def getRunData(scan_id, db=None, stream="primary", query=None, use_v1=True):
"""
Convenience function to get the run's data. Default is the ``primary`` stream.
Expand All @@ -413,7 +466,7 @@ def getRunData(scan_id, db=None, stream="primary", query=None):
db
*object* :
Instance of databroker v1 ``Broker`` or v2 ``BlueskyMongoCatalog``.
Bluesky database, an instance of ``databroker.catalog``.
Default: will search existing session for instance.
stream
Expand All @@ -428,19 +481,32 @@ def getRunData(scan_id, db=None, stream="primary", query=None):
see: https://docs.mongodb.com/manual/reference/operator/query/
use_v1
*bool* :
Chooses databroker API version between 'v1' or 'v2'.
Default: ``True`` (meaning use the v1 API)
(new in apstools 1.5.1)
"""
cat = getCatalog(db).v2.search(query or {})
cat = getCatalog(db.v2)
if query:
cat = db_query(cat, query)

stream = stream or "primary"

run = cat[scan_id]
if not hasattr(run, stream):
raise AttributeError(f"No such stream '{stream}' in run '{scan_id}'.")
if use_v1 is None or use_v1:
run = cat.v1[scan_id]
if stream in run.stream_names:
return run.table(stream_name=stream)
else:
run = cat.v2[scan_id]
if hasattr(run, stream):
return run[stream].read().to_dataframe()

return getattr(run, stream).read().to_dataframe()
raise AttributeError(f"No such stream '{stream}' in run '{scan_id}'.")


def getRunDataValue(scan_id, key, db=None, stream="primary", query=None, idx=-1):
def getRunDataValue(scan_id, key, db=None, stream="primary", query=None, idx=-1, use_v1=True):
"""
Convenience function to get value of key in run stream.
Expand All @@ -463,7 +529,7 @@ def getRunDataValue(scan_id, key, db=None, stream="primary", query=None, idx=-1)
db
*object* :
Instance of databroker v1 ``Broker`` or v2 ``BlueskyMongoCatalog``.
Bluesky database, an instance of ``databroker.catalog``.
Default: will search existing session for instance.
stream
Expand All @@ -485,6 +551,11 @@ def getRunDataValue(scan_id, key, db=None, stream="primary", query=None, idx=-1)
for average value, or ``"all"`` for the full list of values.
Default: ``-1``
use_v1
*bool* :
Chooses databroker API version between 'v1' or 'v2'.
Default: ``True`` (meaning use the v1 API)
(new in apstools 1.5.1)
"""
if idx is None:
Expand Down Expand Up @@ -519,7 +590,7 @@ def getRunDataValue(scan_id, key, db=None, stream="primary", query=None, idx=-1)


def listRunKeys(
scan_id, key_fragment="", db=None, stream="primary", query=None, strict=False
scan_id, key_fragment="", db=None, stream="primary", query=None, strict=False, use_v1=True
):
"""
Convenience function to list all keys (column names) in the scan's stream (default: primary).
Expand All @@ -542,7 +613,7 @@ def listRunKeys(
db
*object* :
Instance of databroker v1 ``Broker`` or v2 ``BlueskyMongoCatalog``.
Bluesky database, an instance of ``databroker.catalog``.
Default: will search existing session for instance.
stream
Expand All @@ -563,9 +634,14 @@ def listRunKeys(
or matched by lower case comparison (``strict=False``)?
Default: ``False``
use_v1
*bool* :
Chooses databroker API version between 'v1' or 'v2'.
Default: ``True`` (meaning use the v1 API)
(new in apstools 1.5.1)
"""
table = getRunData(scan_id, db=db, stream=stream, query=query)
table = getRunData(scan_id, db=db, stream=stream, query=query, use_v1=use_v1)

# fmt: off
if len(key_fragment):
Expand Down Expand Up @@ -751,8 +827,8 @@ def _check_cat(self):
self.cat = getCatalog()

def _apply_search_filters(self):
since = self.since or "1995-01-01"
until = self.until or "2100-12-31"
since = self.since or FIRST_DATA
until = self.until or LAST_DATA
self._check_cat()
query = {}
query.update(databroker.queries.TimeRange(since=since, until=until))
Expand Down

0 comments on commit ec8305a

Please sign in to comment.