Skip to content

Commit

Permalink
refactor: Data model cache refactoring (#3527)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpohekar authored Nov 27, 2024
1 parent bf4f01d commit 8396888
Showing 1 changed file with 127 additions and 66 deletions.
193 changes: 127 additions & 66 deletions src/ansys/fluent/core/data_model_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import copy
from enum import Enum
from threading import RLock
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from ansys.api.fluent.v0.variant_pb2 import Variant
from ansys.fluent.core.utils.fluent_version import FluentVersion
Expand Down Expand Up @@ -206,28 +206,34 @@ def _update_cache_from_variant_state(
source: Dict[str, StateType],
key: str,
state: Variant,
updaterFn,
updater_fn,
rules_str: str,
version,
):
if state.HasField("bool_state"):
updaterFn(source, key, state.bool_state)
elif state.HasField("int64_state"):
updaterFn(source, key, state.int64_state)
elif state.HasField("double_state"):
updaterFn(source, key, state.double_state)
elif state.HasField("string_state"):
updaterFn(source, key, state.string_state)
elif state.HasField("bool_vector_state"):
updaterFn(source, key, state.bool_vector_state.item)
elif state.HasField("int64_vector_state"):
updaterFn(source, key, state.int64_vector_state.item)
elif state.HasField("double_vector_state"):
updaterFn(source, key, state.double_vector_state.item)
elif state.HasField("string_vector_state"):
updaterFn(source, key, state.string_vector_state.item)
elif state.HasField("variant_vector_state"):
updaterFn(source, key, [])
# Helper function to update the source with the state value
def update_source_with_state(state_field):
if state.HasField(state_field):
updater_fn(source, key, getattr(state, state_field))
return True
return False

# Check for basic state types
for state_type in [
"bool_state",
"int64_state",
"double_state",
"string_state",
"bool_vector_state",
"int64_vector_state",
"double_vector_state",
"string_vector_state",
]:
if update_source_with_state(state_type):
return

# Handle variant vector state
if state.HasField("variant_vector_state"):
updater_fn(source, key, [])
for item in state.variant_vector_state.item:
self._update_cache_from_variant_state(
rules,
Expand All @@ -238,34 +244,28 @@ def _update_cache_from_variant_state(
rules_str + "/" + key.split(":", maxsplit=1)[0],
version,
)
elif state.HasField("variant_map_state"):
return

# Handle variant map state
if state.HasField("variant_map_state"):
internal_names_as_keys = (
self.get_config(rules, "name_key") == NameKey.INTERNAL
)

# Determine the appropriate key
if ":" in key:
type_, iname = key.split(":", maxsplit=1)
for k1, v1 in source.items():
if (internal_names_as_keys and k1 == key) or (
(not internal_names_as_keys)
and isinstance(v1, dict)
and v1.get(NameKey.INTERNAL.value) == iname
):
key = k1
break
else: # new named object
if internal_names_as_keys:
source[key] = {}
else:
name = state.variant_map_state.item[
NameKey.DISPLAY.value
].string_state
key = f"{type_}:{name}"
source[key] = {NameKey.INTERNAL.value: iname}
key = self._determine_key(
source, internal_names_as_keys, key, state, type_, iname
)
else:
if key not in source:
source[key] = {}

if version and _is_dict_parameter_type(version, rules, rules_str):
source[key] = {}

# Update the source with items from the variant map state
if state.variant_map_state.item:
source = source[key]
for k, v in state.variant_map_state.item.items():
Expand All @@ -280,8 +280,40 @@ def _update_cache_from_variant_state(
)
else:
source[key] = {}

# Default case when no fields are matched
else:
updaterFn(source, key, None)
updater_fn(source, key, None)

def _determine_key(
self,
source: Dict[str, StateType],
internal_names_as_keys: bool,
key: str,
state: Variant,
type_: str,
iname: str,
) -> str:
"""Determine the appropriate key based on internal naming conventions."""
for k1, v1 in source.items():
if (internal_names_as_keys and k1 == key) or (
(not internal_names_as_keys)
and isinstance(v1, dict)
and v1.get(NameKey.INTERNAL.value) == iname
):
return k1 # Found a matching key

# If no match found and internal naming is used
if internal_names_as_keys:
source[key] = {}
return key

# If no match found and external naming is used
name = state.variant_map_state.item[NameKey.DISPLAY.value].string_state
new_key = f"{type_}:{name}"
source[new_key] = {NameKey.INTERNAL.value: iname}

return new_key

def update_cache(
self, rules: str, state: Variant, deleted_paths: List[str], version=None
Expand All @@ -300,37 +332,16 @@ def update_cache(
Fluent version
"""
cache = self.rules_str_to_cache[rules]

with self._with_lock(rules):
internal_names_as_keys = (
self.get_config(rules, "name_key") == NameKey.INTERNAL
)
for deleted_path in deleted_paths:
comps = [x for x in deleted_path.split("/") if x]
sub_cache = cache
for i, comp in enumerate(comps):
if ":" in comp:
_, iname = comp.split(":", maxsplit=1)
key_to_del = None
for k, v in sub_cache.items():
if (internal_names_as_keys and k == comp) or (
(not internal_names_as_keys)
and isinstance(v, dict)
and v.get(NameKey.INTERNAL.value) == iname
):
if i == len(comps) - 1:
key_to_del = k
else:
sub_cache = v
break
else:
break
if key_to_del:
del sub_cache[key_to_del]
else:
if comp in sub_cache:
sub_cache = sub_cache[comp]
else:
break

# Process deleted paths
self._process_deleted_paths(cache, deleted_paths, internal_names_as_keys)

# Update cache with new state items
for k, v in state.variant_map_state.item.items():
self._update_cache_from_variant_state(
rules,
Expand All @@ -342,6 +353,56 @@ def update_cache(
version,
)

def _process_deleted_paths(
self,
cache: Dict[str, Any],
deleted_paths: List[str],
internal_names_as_keys: bool,
):
"""Process and delete paths from the cache based on the deleted paths list."""
for deleted_path in deleted_paths:
comps = [x for x in deleted_path.split("/") if x]
self._delete_from_cache(cache, comps, internal_names_as_keys)

def _delete_from_cache(
self, sub_cache: Dict[str, Any], comps: List[str], internal_names_as_keys: bool
):
"""Recursively delete components from the cache."""
for i, comp in enumerate(comps):
if ":" in comp:
_, iname = comp.split(":", maxsplit=1)
key_to_del = self._find_key_to_delete(
sub_cache, comp, iname, i == len(comps) - 1, internal_names_as_keys
)
if key_to_del:
del sub_cache[key_to_del]
return # Exit after deletion
else:
if comp in sub_cache:
sub_cache = sub_cache[comp]
else:
break

def _find_key_to_delete(
self,
sub_cache: Dict[str, Any],
comp: str,
iname: str,
is_last_component: bool,
internal_names_as_keys: bool,
) -> Optional[str]:
"""Find the key to delete from the sub-cache."""
for k, v in sub_cache.items():
if (internal_names_as_keys and k == comp) or (
(not internal_names_as_keys)
and isinstance(v, dict)
and v.get(NameKey.INTERNAL.value) == iname
):
return (
k if is_last_component else None
) # Return key if it's the last component
return None # No key found to delete

@staticmethod
def _dm_path_comp(comp):
return ":".join(comp) if comp[1] else comp[0]
Expand Down

0 comments on commit 8396888

Please sign in to comment.