Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add findpv(PVNAME) utility function #497

Merged
merged 18 commits into from
Apr 1, 2021
208 changes: 205 additions & 3 deletions apstools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
~ExcelDatabaseFileBase
~ExcelDatabaseFileGeneric
~ExcelReadError
~findname
~findpv
~findCatalogsInNamespace
~full_dotted_name
~getCatalog
Expand Down Expand Up @@ -863,7 +865,7 @@ def listruns_v1_4(

Out[100]: <pyRestTable.rest_table.Table at 0x7f004e4d4898>

*new in apstools release 1.1.10*
(new in apstools release 1.1.10)
"""
warnings.warn(
"DEPRECATED: listruns_v1_4() will be removed"
Expand Down Expand Up @@ -1078,7 +1080,7 @@ def replay(headers, callback=None, sort=True):
Sort the headers chronologically if True.
(default:``True``)

*new in apstools release 1.1.11*
(new in apstools release 1.1.11)
"""
callback = callback or ipython_shell_namespace().get(
"bec", # get from IPython shell
Expand Down Expand Up @@ -1225,7 +1227,7 @@ def listobjects(show_pv=True, printing=True, verbose=False, symbols=None):

In [2]:

*new in apstools release 1.1.8*
(new in apstools release 1.1.8)
"""
table = pyRestTable.Table()
table.labels = ["name", "ophyd structure"]
Expand Down Expand Up @@ -2325,3 +2327,203 @@ def copy_filtered_catalog(source_cat, target_cat, query=None):
)
for key, doc in run.documents():
target_cat.v1.insert(key, doc)


_findpv_registry = None


class PVRegistry:
"""
Cross-reference EPICS PVs with ophyd EpicsSignalBase objects.
"""

def __init__(self, ns=None):
"""
Search ophyd objects for PV or ophyd names.

The rebuild starts with the IPython console namespace
(and defaults to the global namespace if the former
cannot be obtained).

PARAMETERS

ns *dict* or `None`: namespace dictionary
"""
self._pvdb = defaultdict(lambda: defaultdict(list))
self._odb = {}
self._device_name = None
self._known_device_names = []
g = ns or ipython_shell_namespace() or globals()

# kickoff the registration process
logger.debug(
"Cross-referencing EPICS PVs with"
" Python objects and ophyd symbols"
)
self._ophyd_epicsobject_walker(g)

def _ophyd_epicsobject_walker(self, parent):
"""
Walk through the parent object for ophyd Devices & EpicsSignals.

This function is used to rebuild the ``self._pvdb`` object.
"""
if isinstance(parent, dict):
keys = parent.keys()
else:
keys = parent.component_names
_nm_base = self._device_name
for k in keys:
if isinstance(parent, dict):
_nm_base = []
v = self._ref_dict(parent, k)
else:
v = self._ref_object_attribute(parent, k)
if v is None:
continue
# print(k, type(v))
if isinstance(v, ophyd.signal.EpicsSignalBase):
try:
self._signal_processor(v)
self._odb[v.name] = ".".join(self._device_name + [k])
except (KeyError, RuntimeError) as exc:
# FIXME: see issue 509, need to learn how to reproduce
logger.error(
"Exception while examining key '%s': (%s)",
k, exc
)
elif isinstance(v, ophyd.Device):
# print("Device", v.name)
self._device_name = _nm_base + [k]
if v.name not in self._known_device_names:
self._odb[v.name] = ".".join(self._device_name)
self._known_device_names.append(v.name)
self._ophyd_epicsobject_walker(v)

def _ref_dict(self, parent, key):
"""Accessor used by ``_ophyd_epicsobject_walker()``"""
return parent[key]

def _ref_object_attribute(self, parent, key):
"""Accessor used by ``_ophyd_epicsobject_walker()``"""
return getattr(parent, key)

def _register_signal(self, signal, pv, mode):
"""Register a signal with the given mode."""
fdn = full_dotted_name(signal)
if fdn not in self._pvdb[pv][mode]:
self._pvdb[pv][mode].append(fdn)

def _signal_processor(self, signal):
"""Register a signal's read & write PVs."""
self._register_signal(signal, signal._read_pv.pvname, "R")
if hasattr(signal, "_write_pv"):
self._register_signal(signal, signal._write_pv.pvname, "W")

def search_by_mode(self, pvname, mode="R"):
"""Search for PV in specified mode."""
if mode not in ["R", "W"]:
raise ValueError(
f"Incorrect mode given ({mode}." " Must be either `R` or `W`."
)
return self._pvdb[pvname][mode]

def search(self, pvname):
"""Search for PV in both read & write modes."""
return dict(
read=self.search_by_mode(pvname, "R"),
write=self.search_by_mode(pvname, "W"),
)

def ophyd_search(self, oname):
"""Search for ophyd object by ophyd name."""
return self._odb.get(oname)


def _get_pv_registry(force_rebuild, ns):
"""
Check if need to build/rebuild the PV registry.

PARAMETERS

force_rebuild
*bool* :
If ``True``, rebuild the internal registry that maps
EPICS PV names to ophyd objects.
ns
*dict* or `None` :
Namespace dictionary of Python objects.

"""
global _findpv_registry
if _findpv_registry is None or force_rebuild:
_findpv_registry = PVRegistry(ns=ns)
return _findpv_registry


def findname(oname, force_rebuild=False, ns=None):
"""
Find the ophyd (dotted name) object associated with the given ophyd name.

PARAMETERS

oname
*str* :
ophyd name to search
force_rebuild
*bool* :
If ``True``, rebuild the internal registry that maps
ophyd names to ophyd objects.
ns
*dict* or `None` :
Namespace dictionary of Python objects.

RETURNS

str or ``None``:
Name of the ophyd object.

EXAMPLE::

In [45]: findpv("adsimdet_cam_acquire")
prjemian marked this conversation as resolved.
Show resolved Hide resolved
Out[45]: 'adsimdet.cam.acquire'

(new in apstools 1.5.0)
"""
return _get_pv_registry(force_rebuild, ns).ophyd_search(oname)


def findpv(pvname, force_rebuild=False, ns=None):
"""
Find all ophyd objects associated with the given EPICS PV.

PARAMETERS

pvname
*str* :
EPICS PV name to search
force_rebuild
*bool* :
If ``True``, rebuild the internal registry that maps
EPICS PV names to ophyd objects.
ns
*dict* or `None` :
Namespace dictionary of Python objects.

RETURNS

dict or ``None``:
Dictionary of matching ophyd objects, keyed by how the
PV is used by the ophyd signal. The keys are
``read`` and ``write``.

EXAMPLE::

In [45]: findpv("ad:cam1:Acquire")
Out[45]: {'read': [], 'write': ['adsimdet.cam.acquire']}

In [46]: findpv("ad:cam1:Acquire_RBV")
Out[46]: {'read': ['adsimdet.cam.acquire'], 'write': []}

"""
return _get_pv_registry(force_rebuild, ns).search(pvname)
80 changes: 80 additions & 0 deletions tests/debug_findpv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
examine the execution of utils.findpv()

needs a connection to PVs, of course
"""

import apstools.utils
import ophyd
import ophyd.areadetector.filestore_mixins
import ophyd.areadetector.plugins
import os
import sys

sys.path.append(
os.path.abspath(
os.path.join(__file__, "..")
)
)

AD_IOC_PREFIX = "ad:"
AD_IOC_FILES_ROOT = "/"
BLUESKY_FILES_ROOT = "/tmp/docker_ioc/iocad"
IOC_IMAGE_DIR = "/tmp/images/"
AD_IOC_PATH = os.path.join(
AD_IOC_FILES_ROOT,
IOC_IMAGE_DIR.lstrip("/")
)
BLUESKY_PATH = os.path.join(
BLUESKY_FILES_ROOT,
IOC_IMAGE_DIR.lstrip("/")
)


class MyCam(ophyd.SimDetectorCam):
pool_max_buffers = None


class MyHDF5Plugin(
ophyd.areadetector.filestore_mixins.FileStoreHDF5IterativeWrite,
ophyd.areadetector.plugins.HDF5Plugin_V34
):
pass


class MyFixedImagePlugin(ophyd.ImagePlugin):
pool_max_buffers = None


class MyDetector(ophyd.SimDetector):
cam = ophyd.ADComponent(MyCam, "cam1:")
hdf1 = ophyd.ADComponent(
MyHDF5Plugin,
"HDF1:",
write_path_template=AD_IOC_PATH,
read_path_template=BLUESKY_PATH,
)
image = ophyd.ADComponent(MyFixedImagePlugin, "image1:")


def main():
gpm1 = ophyd.EpicsMotor("gp:m1", name="gpm1")
simdet = MyDetector("ad:", name="simdet")
gpm1.wait_for_connection()
simdet.wait_for_connection()
ns = dict(m1=gpm1, simdet=simdet)
print(f'{apstools.utils.findpv("ad:image1:ArrayData", ns=ns) = }')
print(f'{apstools.utils.findpv("ad:HDF1:FilePath_RBV", ns=ns) = }')
print(f'{apstools.utils.findpv("ad:cam1:Acquire", ns=ns) = }')
print(f'{apstools.utils.findpv("gp:m1.RBV", ns=ns) = }')

print(f'{apstools.utils.findname(gpm1.user_setpoint.name, ns=ns) = }')
print(f'{apstools.utils.findname("m1_user_setpoint", ns=ns) = }')
print(f'{apstools.utils.findname(simdet.cam.acquire.name, ns=ns) = }')
print(f'{apstools.utils.findname("simdet_hdf1_array_size", ns=ns) = }')
print(f'{apstools.utils.findname("simdet_image_dim1_sa", ns=ns) = }')
print(f'{apstools.utils.findname("simdet_cam_peak_width_peak_width_y", ns=ns) = }')


if __name__ == "__main__":
main()