Skip to content

Commit

Permalink
Merge pull request #497 from BCDA-APS/496_find_EPICS_PV
Browse files Browse the repository at this point in the history
add findpv(PVNAME) utility function
  • Loading branch information
prjemian authored Apr 1, 2021
2 parents f546649 + 07517dd commit 6c00530
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 3 deletions.
208 changes: 205 additions & 3 deletions apstools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
~ExcelDatabaseFileBase
~ExcelDatabaseFileGeneric
~ExcelReadError
~findname
~findpv
~findCatalogsInNamespace
~full_dotted_name
~getCatalog
Expand Down Expand Up @@ -863,7 +865,7 @@ def listruns_v1_4(
Out[100]: <pyRestTable.rest_table.Table at 0x7f004e4d4898>
*new in apstools release 1.1.10*
(new in apstools release 1.1.10)
"""
warnings.warn(
"DEPRECATED: listruns_v1_4() will be removed"
Expand Down Expand Up @@ -1078,7 +1080,7 @@ def replay(headers, callback=None, sort=True):
Sort the headers chronologically if True.
(default:``True``)
*new in apstools release 1.1.11*
(new in apstools release 1.1.11)
"""
callback = callback or ipython_shell_namespace().get(
"bec", # get from IPython shell
Expand Down Expand Up @@ -1225,7 +1227,7 @@ def listobjects(show_pv=True, printing=True, verbose=False, symbols=None):
In [2]:
*new in apstools release 1.1.8*
(new in apstools release 1.1.8)
"""
table = pyRestTable.Table()
table.labels = ["name", "ophyd structure"]
Expand Down Expand Up @@ -2325,3 +2327,203 @@ def copy_filtered_catalog(source_cat, target_cat, query=None):
)
for key, doc in run.documents():
target_cat.v1.insert(key, doc)


_findpv_registry = None


class PVRegistry:
"""
Cross-reference EPICS PVs with ophyd EpicsSignalBase objects.
"""

def __init__(self, ns=None):
"""
Search ophyd objects for PV or ophyd names.
The rebuild starts with the IPython console namespace
(and defaults to the global namespace if the former
cannot be obtained).
PARAMETERS
ns *dict* or `None`: namespace dictionary
"""
self._pvdb = defaultdict(lambda: defaultdict(list))
self._odb = {}
self._device_name = None
self._known_device_names = []
g = ns or ipython_shell_namespace() or globals()

# kickoff the registration process
logger.debug(
"Cross-referencing EPICS PVs with"
" Python objects and ophyd symbols"
)
self._ophyd_epicsobject_walker(g)

def _ophyd_epicsobject_walker(self, parent):
"""
Walk through the parent object for ophyd Devices & EpicsSignals.
This function is used to rebuild the ``self._pvdb`` object.
"""
if isinstance(parent, dict):
keys = parent.keys()
else:
keys = parent.component_names
_nm_base = self._device_name
for k in keys:
if isinstance(parent, dict):
_nm_base = []
v = self._ref_dict(parent, k)
else:
v = self._ref_object_attribute(parent, k)
if v is None:
continue
# print(k, type(v))
if isinstance(v, ophyd.signal.EpicsSignalBase):
try:
self._signal_processor(v)
self._odb[v.name] = ".".join(self._device_name + [k])
except (KeyError, RuntimeError) as exc:
# FIXME: see issue 509, need to learn how to reproduce
logger.error(
"Exception while examining key '%s': (%s)",
k, exc
)
elif isinstance(v, ophyd.Device):
# print("Device", v.name)
self._device_name = _nm_base + [k]
if v.name not in self._known_device_names:
self._odb[v.name] = ".".join(self._device_name)
self._known_device_names.append(v.name)
self._ophyd_epicsobject_walker(v)

def _ref_dict(self, parent, key):
"""Accessor used by ``_ophyd_epicsobject_walker()``"""
return parent[key]

def _ref_object_attribute(self, parent, key):
"""Accessor used by ``_ophyd_epicsobject_walker()``"""
return getattr(parent, key)

def _register_signal(self, signal, pv, mode):
"""Register a signal with the given mode."""
fdn = full_dotted_name(signal)
if fdn not in self._pvdb[pv][mode]:
self._pvdb[pv][mode].append(fdn)

def _signal_processor(self, signal):
"""Register a signal's read & write PVs."""
self._register_signal(signal, signal._read_pv.pvname, "R")
if hasattr(signal, "_write_pv"):
self._register_signal(signal, signal._write_pv.pvname, "W")

def search_by_mode(self, pvname, mode="R"):
"""Search for PV in specified mode."""
if mode not in ["R", "W"]:
raise ValueError(
f"Incorrect mode given ({mode}." " Must be either `R` or `W`."
)
return self._pvdb[pvname][mode]

def search(self, pvname):
"""Search for PV in both read & write modes."""
return dict(
read=self.search_by_mode(pvname, "R"),
write=self.search_by_mode(pvname, "W"),
)

def ophyd_search(self, oname):
"""Search for ophyd object by ophyd name."""
return self._odb.get(oname)


def _get_pv_registry(force_rebuild, ns):
"""
Check if need to build/rebuild the PV registry.
PARAMETERS
force_rebuild
*bool* :
If ``True``, rebuild the internal registry that maps
EPICS PV names to ophyd objects.
ns
*dict* or `None` :
Namespace dictionary of Python objects.
"""
global _findpv_registry
if _findpv_registry is None or force_rebuild:
_findpv_registry = PVRegistry(ns=ns)
return _findpv_registry


def findname(oname, force_rebuild=False, ns=None):
"""
Find the ophyd (dotted name) object associated with the given ophyd name.
PARAMETERS
oname
*str* :
ophyd name to search
force_rebuild
*bool* :
If ``True``, rebuild the internal registry that maps
ophyd names to ophyd objects.
ns
*dict* or `None` :
Namespace dictionary of Python objects.
RETURNS
str or ``None``:
Name of the ophyd object.
EXAMPLE::
In [45]: findname("adsimdet_cam_acquire")
Out[45]: 'adsimdet.cam.acquire'
(new in apstools 1.5.0)
"""
return _get_pv_registry(force_rebuild, ns).ophyd_search(oname)


def findpv(pvname, force_rebuild=False, ns=None):
"""
Find all ophyd objects associated with the given EPICS PV.
PARAMETERS
pvname
*str* :
EPICS PV name to search
force_rebuild
*bool* :
If ``True``, rebuild the internal registry that maps
EPICS PV names to ophyd objects.
ns
*dict* or `None` :
Namespace dictionary of Python objects.
RETURNS
dict or ``None``:
Dictionary of matching ophyd objects, keyed by how the
PV is used by the ophyd signal. The keys are
``read`` and ``write``.
EXAMPLE::
In [45]: findpv("ad:cam1:Acquire")
Out[45]: {'read': [], 'write': ['adsimdet.cam.acquire']}
In [46]: findpv("ad:cam1:Acquire_RBV")
Out[46]: {'read': ['adsimdet.cam.acquire'], 'write': []}
"""
return _get_pv_registry(force_rebuild, ns).search(pvname)
80 changes: 80 additions & 0 deletions tests/debug_findpv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
examine the execution of utils.findpv()
needs a connection to PVs, of course
"""

import apstools.utils
import ophyd
import ophyd.areadetector.filestore_mixins
import ophyd.areadetector.plugins
import os
import sys

sys.path.append(
os.path.abspath(
os.path.join(__file__, "..")
)
)

AD_IOC_PREFIX = "ad:"
AD_IOC_FILES_ROOT = "/"
BLUESKY_FILES_ROOT = "/tmp/docker_ioc/iocad"
IOC_IMAGE_DIR = "/tmp/images/"
AD_IOC_PATH = os.path.join(
AD_IOC_FILES_ROOT,
IOC_IMAGE_DIR.lstrip("/")
)
BLUESKY_PATH = os.path.join(
BLUESKY_FILES_ROOT,
IOC_IMAGE_DIR.lstrip("/")
)


class MyCam(ophyd.SimDetectorCam):
pool_max_buffers = None


class MyHDF5Plugin(
ophyd.areadetector.filestore_mixins.FileStoreHDF5IterativeWrite,
ophyd.areadetector.plugins.HDF5Plugin_V34
):
pass


class MyFixedImagePlugin(ophyd.ImagePlugin):
pool_max_buffers = None


class MyDetector(ophyd.SimDetector):
cam = ophyd.ADComponent(MyCam, "cam1:")
hdf1 = ophyd.ADComponent(
MyHDF5Plugin,
"HDF1:",
write_path_template=AD_IOC_PATH,
read_path_template=BLUESKY_PATH,
)
image = ophyd.ADComponent(MyFixedImagePlugin, "image1:")


def main():
gpm1 = ophyd.EpicsMotor("gp:m1", name="gpm1")
simdet = MyDetector("ad:", name="simdet")
gpm1.wait_for_connection()
simdet.wait_for_connection()
ns = dict(m1=gpm1, simdet=simdet)
print(f'{apstools.utils.findpv("ad:image1:ArrayData", ns=ns) = }')
print(f'{apstools.utils.findpv("ad:HDF1:FilePath_RBV", ns=ns) = }')
print(f'{apstools.utils.findpv("ad:cam1:Acquire", ns=ns) = }')
print(f'{apstools.utils.findpv("gp:m1.RBV", ns=ns) = }')

print(f'{apstools.utils.findname(gpm1.user_setpoint.name, ns=ns) = }')
print(f'{apstools.utils.findname("m1_user_setpoint", ns=ns) = }')
print(f'{apstools.utils.findname(simdet.cam.acquire.name, ns=ns) = }')
print(f'{apstools.utils.findname("simdet_hdf1_array_size", ns=ns) = }')
print(f'{apstools.utils.findname("simdet_image_dim1_sa", ns=ns) = }')
print(f'{apstools.utils.findname("simdet_cam_peak_width_peak_width_y", ns=ns) = }')


if __name__ == "__main__":
main()

0 comments on commit 6c00530

Please sign in to comment.