Skip to content

Commit

Permalink
Merge pull request #779 from Hugovdberg/github778
Browse files Browse the repository at this point in the history
fix: github issue #778
  • Loading branch information
Hugovdberg authored Jul 23, 2024
2 parents 7dcec64 + 2bec554 commit 6cf55c1
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 31 deletions.
38 changes: 19 additions & 19 deletions PIconnect/PIAF.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import dataclasses
import warnings
from typing import Any, Dict, List, Optional, Union, cast
from typing import Any, Optional, Union, cast

import pandas as pd

Expand All @@ -18,18 +18,18 @@ class PIAFServer:
"""Reference to a PI AF server and its databases."""

server: AF.PISystem
databases: Dict[str, AF.AFDatabase] = dataclasses.field(default_factory=dict)
databases: dict[str, AF.AFDatabase] = dataclasses.field(default_factory=dict)

def __getitem__(self, attr: str) -> Union[AF.PISystem, Dict[str, AF.AFDatabase]]:
def __getitem__(self, attr: str) -> Union[AF.PISystem, dict[str, AF.AFDatabase]]:
"""Allow access to attributes as if they were dictionary items."""
return getattr(self, attr)


ServerSpec = Dict[str, Union[AF.PISystem, Dict[str, AF.AFDatabase]]]
ServerSpec = dict[str, Union[AF.PISystem, dict[str, AF.AFDatabase]]]


def _lookup_servers() -> Dict[str, ServerSpec]:
servers: Dict[str, PIAFServer] = {}
def _lookup_servers() -> dict[str, ServerSpec]:
servers: dict[str, PIAFServer] = {}
for s in AF.PISystems():
try:
servers[s.Name] = server = PIAFServer(s)
Expand Down Expand Up @@ -74,7 +74,7 @@ class PIAFDatabase(object):

version = "0.3.0"

servers: Dict[str, ServerSpec] = _lookup_servers()
servers: dict[str, ServerSpec] = _lookup_servers()
default_server: Optional[ServerSpec] = _lookup_default_server()

def __init__(self, server: Optional[str] = None, database: Optional[str] = None) -> None:
Expand Down Expand Up @@ -106,7 +106,7 @@ def _initialise_database(
if database is None:
return default_db

databases = cast(Dict[str, AF.AFDatabase], server["databases"])
databases = cast(dict[str, AF.AFDatabase], server["databases"])
if database not in databases:
message = 'Database "{database}" not found, using the default database.'
warnings.warn(
Expand Down Expand Up @@ -143,20 +143,20 @@ def database_name(self) -> str:
return self.database.Name

@property
def children(self) -> Dict[str, "PIAFElement"]:
def children(self) -> dict[str, "PIAFElement"]:
"""Return a dictionary of the direct child elements of the database."""
return {c.Name: PIAFElement(c) for c in self.database.Elements}

@property
def tables(self) -> Dict[str, "PIAFTable"]:
def tables(self) -> dict[str, "PIAFTable"]:
"""Return a dictionary of the tables in the database."""
return {t.Name: PIAFTable(t) for t in self.database.Tables}

def descendant(self, path: str) -> "PIAFElement":
"""Return a descendant of the database from an exact path."""
return PIAFElement(self.database.Elements.get_Item(path))

def search(self, query: Union[str, List[str]]) -> List[PIAFAttribute.PIAFAttribute]:
def search(self, query: Union[str, list[str]]) -> list[PIAFAttribute.PIAFAttribute]:
"""Search PIAFAttributes by element|attribute path strings.
Return a list of PIAFAttributes directly from a list of element|attribute path strings
Expand All @@ -167,8 +167,8 @@ def search(self, query: Union[str, List[str]]) -> List[PIAFAttribute.PIAFAttribu
"BaseElement/childElement/childElement|Attribute|ChildAttribute|ChildAttribute")
"""
attributelist: List[PIAFAttribute.PIAFAttribute] = []
if isinstance(query, List):
attributelist: list[PIAFAttribute.PIAFAttribute] = []
if isinstance(query, list):
return [y for x in query for y in self.search(x)]
if "|" in query:
splitpath = query.split("|")
Expand All @@ -177,7 +177,7 @@ def search(self, query: Union[str, List[str]]) -> List[PIAFAttribute.PIAFAttribu
if len(splitpath) > 2:
for x in range(len(splitpath) - 2):
attribute = attribute.children[splitpath[x + 2]]
attributelist.append(attribute)
attributelist.append(attribute)
return attributelist

def event_frames(
Expand All @@ -187,7 +187,7 @@ def event_frames(
max_count: int = 1000,
search_mode: PIConsts.EventFrameSearchMode = _DEFAULT_EVENTFRAME_SEARCH_MODE,
search_full_hierarchy: bool = False,
) -> Dict[str, "PIAFEventFrame"]:
) -> dict[str, "PIAFEventFrame"]:
"""Search for event frames in the database."""
_start_time = _time.to_af_time(start_time)
_search_mode = AF.EventFrame.AFEventFrameSearchMode(int(search_mode))
Expand Down Expand Up @@ -222,7 +222,7 @@ def parent(self) -> Optional["PIAFElement"]:
return self.__class__(self.element.Parent)

@property
def children(self) -> Dict[str, "PIAFElement"]:
def children(self) -> dict[str, "PIAFElement"]:
"""Return a dictionary of the direct child elements of the current element."""
return {c.Name: self.__class__(c) for c in self.element.Elements}

Expand All @@ -249,7 +249,7 @@ def parent(self) -> Optional["PIAFEventFrame"]:
return self.__class__(self.element.Parent)

@property
def children(self) -> Dict[str, "PIAFEventFrame"]:
def children(self) -> dict[str, "PIAFEventFrame"]:
"""Return a dictionary of the direct child event frames of the current event frame."""
return {c.Name: self.__class__(c) for c in self.element.EventFrames}

Expand All @@ -261,12 +261,12 @@ def __init__(self, table: AF.Asset.AFTable) -> None:
self._table = table

@property
def columns(self) -> List[str]:
def columns(self) -> list[str]:
"""Return the names of the columns in the table."""
return [col.ColumnName for col in self._table.Table.Columns]

@property
def _rows(self) -> List[System.Data.DataRow]:
def _rows(self) -> list[System.Data.DataRow]:
return self._table.Table.Rows

@property
Expand Down
51 changes: 39 additions & 12 deletions PIconnect/_typing/Asset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Mock classes for the AF module."""

from typing import List, Optional, Union
from collections.abc import Iterator
from typing import Optional, Union, cast

from . import AF, Data, Generic
from . import UnitsOfMeasure as UOM
Expand All @@ -22,33 +23,56 @@


class AFAttribute:
def __init__(self, name: str) -> None:
def __init__(self, name: str, parent: Optional["AFAttribute"] = None) -> None:
self.Attributes: AFAttributes
if parent is None:
self.Attributes = AFAttributes(
[
AFAttribute("Attribute1", parent=self),
AFAttribute("Attribute2", parent=self),
]
)
self.Data: Data.AFData
self.DataReference: AFDataReference
self.Description: str
self.DefaultUOM: UOM.UOM
self.Description: str = f"Description of {name}"
self.DefaultUOM = UOM.UOM()
self.Name = name
self.Parent: Optional[AFAttribute]
self.Parent = parent

@staticmethod
def GetValue() -> AFValue:
"""Stub for getting a value."""
return AFValue(0)


class AFAttributes(List[AFAttribute]):
def __init__(self, elements: List[AFAttribute]) -> None:
class AFAttributes(list[AFAttribute]):
def __init__(self, elements: list[AFAttribute]) -> None:
self.Count: int
self._values = elements

def __iter__(self) -> Iterator[AFAttribute]:
yield from self._values


class AFBaseElement:
def __init__(self, name: str, parent: Optional["AFElement"] = None) -> None:
self.Attributes: AFAttributes
self.Attributes = AFAttributes(
[
AFAttribute("Attribute1"),
AFAttribute("Attribute2"),
]
)
self.Categories: AF.AFCategories
self.Description: str
self.Elements: AFElements
if parent is None:
self.Elements = AFElements(
[
AFElement("Element1", parent=cast(AFElement, self)),
AFElement("Element2", parent=cast(AFElement, self)),
AFElement("BaseElement", parent=cast(AFElement, self)),
]
)
self.Name = name
self.Parent = parent

Expand All @@ -57,8 +81,8 @@ class AFElement(AFBaseElement):
"""Mock class of the AF.AFElement class."""


class AFElements(List[AFElement]):
def __init__(self, elements: List[AFElement]) -> None:
class AFElements(list[AFElement]):
def __init__(self, elements: list[AFElement]) -> None:
self.Count: int
self._values = elements

Expand All @@ -68,6 +92,9 @@ def get_Item(self, name: Union[str, int]) -> AFElement:
return self._values[name]
return AFElement(name)

def __iter__(self) -> Iterator[AFElement]:
yield from self._values


class AFElementTemplate:
"""Mock class of the AF.Asset.AFElementTemplate class."""
Expand All @@ -90,8 +117,8 @@ def __init__(self, name: str) -> None:
self.Table: System.Data.DataTable


class AFTables(List[AFTable]):
def __init__(self, elements: List[AFTable]) -> None:
class AFTables(list[AFTable]):
def __init__(self, elements: list[AFTable]) -> None:
self.Count: int
self._values = elements

Expand Down
19 changes: 19 additions & 0 deletions tests/test_PIAF.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@
import pytest

import PIconnect as PI
import PIconnect.AFSDK as AFSDK
import PIconnect.PIAF as PIAF
from PIconnect._typing import AF

AFSDK.AF, AFSDK.System, AFSDK.AF_SDK_VERSION = AFSDK.__fallback()
PI.AF = PIAF.AF = AFSDK.AF
PI.PIAFDatabase.servers = PIAF._lookup_servers()
PI.PIAFDatabase.default_server = PIAF._lookup_default_server()


class TestAFDatabase:
"""Test connecting to the AF database."""
Expand Down Expand Up @@ -57,3 +64,15 @@ def test_search(self):
with PI.PIAFDatabase() as db:
attributes = db.search([r"", r""])
assert isinstance(attributes, list)

def test_split_element_attribute(self):
"""Test that calling attributes on the database returns a list of attributes."""
with PI.PIAFDatabase() as db:
attributes = db.search(r"BaseElement|Attribute1")
assert attributes[0].name == "Attribute1"

def test_split_element_nested_attribute(self):
"""Test that calling attributes on the database returns a list of attributes."""
with PI.PIAFDatabase() as db:
attributes = db.search(r"BaseElement|Attribute1|Attribute2")
assert attributes[0].name == "Attribute2"

0 comments on commit 6cf55c1

Please sign in to comment.