Skip to content

Commit

Permalink
feat: remove unalias function
Browse files Browse the repository at this point in the history
  • Loading branch information
mkundu1 committed Nov 26, 2024
1 parent 18084aa commit 186d9b0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 157 deletions.
184 changes: 64 additions & 120 deletions src/ansys/fluent/core/solver/flobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,33 @@ def to_python_name(fluent_name: str) -> str:
return name


def _get_python_path_comps(python_path: str):
"""Get python path components for traversing class hierarchy."""
python_path = python_path.removeprefix("<session>")
python_path = python_path.removeprefix(".settings")
python_path = python_path.removeprefix(".")
comps = python_path.split(".")
comps = [comp.split("[")[0] for comp in comps]
return comps


def _get_alias_class(root_cls, path: list[str], alias_path: list[str]):
"""Get the class for the given alias path."""
parent_count = 0
while alias_path[0] == "..":
parent_count += 1
alias_path.pop(0)
for _ in range(parent_count):
path.pop()
path = path + alias_path
cls = root_cls
for comp in path:
cls = cls._child_classes[comp]
if issubclass(cls, (NamedObject, ListObject)):
cls = cls.child_object_type
return cls, path


class Base:
"""Provides the base class for settings and command objects.
Expand Down Expand Up @@ -646,43 +673,6 @@ def _create_child(cls, name, parent: weakref.CallableProxyType, alias_path=None)
return cls(name, parent)


def _combine_set_states(states: List[Tuple[str, StateT]]) -> Tuple[str, StateT]:
"""Combines multiple set-states into a single set-state at a common parent path.
Parameters
----------
states : list[tuple[str, StateT]]
List of (<path>, <state>) tuples.
Returns
-------
tuple[str, StateT]
Common parent path, combined state.
"""
paths, _ = zip(*states)
common_path = []
paths = [path.split("/") for path in paths]
for comps in zip(*paths):
if len(set(comps)) == 1:
common_path.append(comps[0])
else:
break
combined_state = {}
for path, state in states:
comps = path.split("/")
comps = comps[len(common_path) :]
if comps:
if not isinstance(combined_state, dict):
combined_state = {}
obj = combined_state
for comp in comps[:-1]:
obj = obj.setdefault(comp, {})
obj[comps[-1]] = state
else:
combined_state = state
return "/".join(common_path), combined_state


class SettingsBase(Base, Generic[StateT]):
"""Base class for settings objects.
Expand All @@ -696,7 +686,7 @@ class SettingsBase(Base, Generic[StateT]):
"""

@classmethod
def to_scheme_keys(cls, value: StateT) -> StateT:
def to_scheme_keys(cls, value: StateT, root_cls, path: list[str]) -> StateT:
"""Convert value to have keys with scheme names.
This is overridden in the ``Group``, ``NamedObject``, and
Expand All @@ -721,63 +711,6 @@ def get_state(self) -> StateT:
"""Get the state of the object."""
return self.to_python_keys(self.flproxy.get_var(self.path))

# Following is not a classmethod, as parent (required to support ".." in alias-path)
# is available only at the instance level.
def _unalias(self, cls, value):
"""Unalias the given value."""
if isinstance(value, collections.abc.Mapping):
ret = {}
outer_set_states = []
for k, v in value.items():
if hasattr(cls, "_child_aliases") and k in cls._child_aliases:
alias, _ = cls._child_aliases[k]
comps = alias.split("/")
if comps[0] == "..":
outer_obj = self
while comps[0] == "..":
outer_obj = outer_obj.parent
comps = comps[1:]
for comp in comps:
try:
outer_obj = getattr(outer_obj, comp)
except InactiveObjectError:
outer_obj = super(
SettingsBase, outer_obj
).__getattribute__(comp)
outer_set_states.append((outer_obj, v))
else:
ret_alias = ret
aliased_cls = cls
obj = self
for i, comp in enumerate(comps):
aliased_cls = aliased_cls._child_classes[comp]
try:
obj = getattr(obj, comp)
except InactiveObjectError:
obj = super(SettingsBase, obj).__getattribute__(comp)
if i == len(comps) - 1:
ret_alias[comp], o_set_states = obj._unalias(
aliased_cls, v
)
outer_set_states.extend(o_set_states)
else:
ret_alias = ret_alias.setdefault(comp, {})
else:
if issubclass(cls, Group):
ccls = cls._child_classes[k]
try:
cobj = getattr(self, k)
except InactiveObjectError:
cobj = super(SettingsBase, self).__getattribute__(k)
ret[k], o_set_states = cobj._unalias(ccls, v)
outer_set_states.extend(o_set_states)
else:
ret[k], o_set_states = self._unalias(cls, v)
outer_set_states.extend(o_set_states)
return ret, outer_set_states
else:
return value, []

def set_state(self, state: StateT | None = None, **kwargs):
"""Set the state of the object."""
with self._while_setting_state():
Expand All @@ -786,17 +719,14 @@ def set_state(self, state: StateT | None = None, **kwargs):
):
self.value.set_state(state, **kwargs)
else:
state, outer_set_states = self._unalias(self.__class__, kwargs or state)
if outer_set_states:
set_states = []
if state:
set_states.append((self.path, self.to_scheme_keys(state)))
for obj, state in outer_set_states:
set_states.append((obj.path, obj.to_scheme_keys(state)))
path, state = _combine_set_states(set_states)
self.flproxy.set_var(path, state)
else:
self.flproxy.set_var(self.path, self.to_scheme_keys(state))
self.flproxy.set_var(
self.path,
self.to_scheme_keys(
kwargs or state,
self._root.__class__,
_get_python_path_comps(self.python_path),
),
)

@staticmethod
def _print_state_helper(state, out, indent=0, indent_factor=2):
Expand Down Expand Up @@ -1022,7 +952,7 @@ def __call__(self, *args, **kwargs):
return self.get_state()

@classmethod
def to_scheme_keys(cls, value):
def to_scheme_keys(cls, value, root_cls, path: list[str]):
"""Convert value to have keys with scheme names.
Raises
Expand All @@ -1035,7 +965,15 @@ def to_scheme_keys(cls, value):
for k, v in value.items():
if k in cls.child_names:
ccls = cls._child_classes[k]
ret[ccls.fluent_name] = ccls.to_scheme_keys(v)
ret[ccls.fluent_name] = ccls.to_scheme_keys(v, root_cls, path + [k])
elif k in cls._child_aliases:
alias, scm_alias_name = cls._child_aliases[k]
alias_cls, alias_path = _get_alias_class(
root_cls, path.copy(), alias.split("/")
)
ret[scm_alias_name] = alias_cls.to_scheme_keys(
v, root_cls, alias_path
)
else:
raise RuntimeError("Key '" + str(k) + "' is invalid")
return ret
Expand Down Expand Up @@ -1243,18 +1181,14 @@ def __iter__(self):
# get_state example: a.b["*"].c.d.get_state() == {"<bN>" {"c": {"d": <d_value>}}}
# set_state example: a.b["*"].set_state({"c": {"d": <d_value>}})

def to_scheme_keys(self, value):
def to_scheme_keys(self, value, root_cls, path):
"""Convert value to have keys with scheme names."""
return self._settings_cls.to_scheme_keys(value)
return self._settings_cls.to_scheme_keys(value, root_cls, path)

def to_python_keys(self, value):
"""Convert value to have keys with Python names."""
return self._state_cls.to_python_keys(value)

def _unalias(self, cls, value):
# Not yet implemented
return value, []


class NamedObjectWildcardPath(WildcardPath):
"""WildcardPath at a NamedObject path, so it can be looked up by wildcard again."""
Expand Down Expand Up @@ -1306,12 +1240,12 @@ def __init__(self, name: str | None = None, parent=None):
)

@classmethod
def to_scheme_keys(cls, value):
def to_scheme_keys(cls, value, root_cls, path: list[str]):
"""Convert value to have keys with scheme names."""
if isinstance(value, collections.abc.Mapping):
ret = {}
for k, v in value.items():
ret[k] = cls.child_object_type.to_scheme_keys(v)
ret[k] = cls.child_object_type.to_scheme_keys(v, root_cls, path)
return ret
else:
return value
Expand Down Expand Up @@ -1518,10 +1452,12 @@ def __init__(self, name=None, parent=None):
self._setattr(query, _create_child(cls, None, self))

@classmethod
def to_scheme_keys(cls, value):
def to_scheme_keys(cls, value, root_cls, path: list[str]):
"""Convert value to have keys with scheme names."""
if isinstance(value, collections.abc.Sequence):
return [cls.child_object_type.to_scheme_keys(v) for v in value]
return [
cls.child_object_type.to_scheme_keys(v, root_cls, path) for v in value
]
else:
return value

Expand Down Expand Up @@ -1719,7 +1655,11 @@ def execute_command(self, *args, **kwds):
command_name=self.python_name, value=value, kwargs=kwds
)
# Convert key-value to Scheme key-value
scmKwds[argument.fluent_name] = argument.to_scheme_keys(value)
scmKwds[argument.fluent_name] = argument.to_scheme_keys(
value,
argument._root.__class__,
_get_python_path_comps(argument.python_path),
)
ret = self._execute_command(*args, **scmKwds)
for arg, value in kwds.items():
argument = getattr(self, arg)
Expand Down Expand Up @@ -1805,7 +1745,11 @@ def __call__(self, **kwds):
for arg, value in kwds.items():
argument = getattr(self, arg)
# Convert key-value to Scheme key-value
scmKwds[argument.fluent_name] = argument.to_scheme_keys(value)
scmKwds[argument.fluent_name] = argument.to_scheme_keys(
value,
argument._root.__class__,
_get_python_path_comps(argument.python_path),
)
return self.flproxy.execute_query(self._parent.path, self.obj_name, **scmKwds)


Expand Down
37 changes: 0 additions & 37 deletions tests/test_flobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from ansys.fluent.core.solver import flobject
from ansys.fluent.core.solver.flobject import (
InactiveObjectError,
_combine_set_states,
_gethash,
find_children,
)
Expand Down Expand Up @@ -1239,39 +1238,3 @@ def test_default_argument_names_for_commands(static_mixer_settings_session):
assert solver.results.graphics.contour.delete.argument_names == ["name_list"]
# The following is the default behavior when no arguments are associated with the command.
assert solver.results.graphics.contour.list.argument_names == []


def test_combine_set_states():
assert _combine_set_states(
[
("A/B/C", 1),
]
) == ("A/B/C", 1)

assert _combine_set_states(
[
("A/B/C", 1),
("A/B/C", 2),
]
) == ("A/B/C", 2)

assert _combine_set_states(
[
("A/B/C", 1),
("A/B/C", {"X": 2}),
]
) == ("A/B/C", {"X": 2})

assert _combine_set_states(
[
("A/B/C", 1),
("A/B/D", 2),
]
) == ("A/B", {"C": 1, "D": 2})

assert _combine_set_states(
[
("A/B/C", {"X": 1}),
("A/B/D/E", 2),
]
) == ("A/B", {"C": {"X": 1}, "D": {"E": 2}})

0 comments on commit 186d9b0

Please sign in to comment.