diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 665eba0f3d9..a363b8d1ef3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -466,6 +466,7 @@ jobs: version: 241 - image-tag: v24.2.0 version: 242 + timeout-minutes: 120 steps: diff --git a/src/ansys/fluent/core/services/datamodel_se.py b/src/ansys/fluent/core/services/datamodel_se.py index b8cfd5a75cd..8c881c8e410 100644 --- a/src/ansys/fluent/core/services/datamodel_se.py +++ b/src/ansys/fluent/core/services/datamodel_se.py @@ -5,6 +5,7 @@ import itertools import logging import os +from threading import RLock from typing import Any, Callable, Iterator, NoReturn, Optional, Sequence, Union from google.protobuf.json_format import MessageToDict, ParseDict @@ -334,6 +335,7 @@ class EventSubscription: def __init__( self, service, + path, request_dict: dict[str, Any], ) -> None: """Subscribe to a datamodel event. @@ -343,16 +345,17 @@ def __init__( SubscribeEventError If server fails to subscribe from event. """ - self.is_subscribed = False - self._service = service + self.is_subscribed: bool = False + self._service: DatamodelService = service + self.path: str = path response = service.subscribe_events(request_dict) response = response[0] if response["status"] != DataModelProtoModule.STATUS_SUBSCRIBED: raise SubscribeEventError(request_dict) else: self.is_subscribed = True - self.tag = response["tag"] - self._service.events[self.tag] = self + self.tag: str = response["tag"] + self._service.subscriptions.add(self.tag, self) def unsubscribe(self) -> None: """Unsubscribe the datamodel event. @@ -370,11 +373,79 @@ def unsubscribe(self) -> None: raise UnsubscribeEventError(self.tag) else: self.is_subscribed = False - self._service.events.pop(self.tag, None) + self._service.subscriptions.remove(self.tag) - def __del__(self) -> None: - """Unsubscribe the datamodel event.""" - self.unsubscribe() + +class SubscriptionList: + """Stores subscription objects by tag.""" + + def __init__(self): + self._subscriptions = {} + self._lock = RLock() + + def __contains__(self, tag: str) -> bool: + with self._lock: + return tag in self._subscriptions + + def add(self, tag: str, subscription: EventSubscription) -> None: + """Add a subscription object. + + Parameters + ---------- + tag : str + Subscription tag. + subscription : EventSubscription + Subscription object. + """ + with self._lock: + self._subscriptions[tag] = subscription + + def remove(self, tag: str) -> None: + """Remove a subscription object. + + Parameters + ---------- + tag : str + Subscription tag. + """ + with self._lock: + self._subscriptions.pop(tag, None) + + def unsubscribe_while_deleting( + self, rules: str, path: str, deletion_stage: str + ) -> None: + """Unsubscribe corresponding subscription objects while the datamodel object is + being deleted. + + Parameters + ---------- + rules : str + Datamodel object rules. + path : str + Datamodel object path. + deletion_stage : {"before", "after"} + All subscription objects except those of on-deleted type are unsubscribed + before the datamodel object is deleted. On-deleted subscription objects are + unsubscribed after the datamodel object is deleted. + """ + with self._lock: + delete_tag = f"/{rules}/deleted" + after = deletion_stage == "after" + keys_to_unsubscribe = [] + for k, v in self._subscriptions.items(): + if v.path.startswith(path) and not ( + after ^ v.tag.startswith(delete_tag) + ): + keys_to_unsubscribe.append(k) + for k in reversed(keys_to_unsubscribe): + self._subscriptions[k].unsubscribe() + + def unsubscribe_all(self) -> None: + """Unsubscribe all subscription objects.""" + with self._lock: + while self._subscriptions: + v = next(reversed(self._subscriptions.values())) + v.unsubscribe() class DatamodelService(StreamingService): @@ -394,7 +465,7 @@ def __init__( metadata=metadata, ) self.event_streaming = None - self.events = {} + self.subscriptions = SubscriptionList() self.file_transfer_service = file_transfer_service def get_attribute_value(self, rules: str, path: str, attribute: str) -> _TValue: @@ -561,9 +632,7 @@ def unsubscribe_events(self, tags: list[str]) -> dict[str, Any]: def unsubscribe_all_events(self) -> None: """Unsubscribe all subscribed events.""" - for event in list(self.events.values()): - event.unsubscribe() - self.events.clear() + self.subscriptions.unsubscribe_all() def add_on_child_created( self, rules: str, path: str, child_type: str, obj, cb: Callable @@ -580,7 +649,7 @@ def add_on_child_created( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -596,7 +665,7 @@ def add_on_deleted( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -612,7 +681,7 @@ def add_on_changed( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -628,7 +697,7 @@ def add_on_affected( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -647,7 +716,7 @@ def add_on_affected_at_type_path( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -666,7 +735,7 @@ def add_on_command_executed( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -685,7 +754,7 @@ def add_on_attribute_changed( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -705,7 +774,7 @@ def add_on_command_attribute_changed( } ] } - subscription = EventSubscription(self, request_dict) + subscription = EventSubscription(self, path, request_dict) self.event_streaming.register_callback(subscription.tag, obj, cb) return subscription @@ -1023,6 +1092,14 @@ def delete_child_objects(self, obj_type: str, child_names: list[str]): child_names : List[str] List of named objects. """ + for child_name in child_names: + child_path = f"{convert_path_to_se_path(self.path)}/{obj_type}:{child_name}" + # delete_child_objects doesn't stream back on-deleted events. Thus + # unsubscribing all subscription objects before the deletion. + for stage in ["before", "after"]: + self.service.subscriptions.unsubscribe_while_deleting( + self.rules, child_path, stage + ) self.service.delete_child_objects( self.rules, convert_path_to_se_path(self.path), obj_type, child_names ) @@ -1037,6 +1114,13 @@ def delete_all_child_objects(self, obj_type): obj_type: str Type of the named object container. """ + child_path = f"{convert_path_to_se_path(self.path)}/{obj_type}:" + # delete_all_child_objects doesn't stream back on-deleted events. Thus + # unsubscribing all subscription objects before the deletion. + for stage in ["before", "after"]: + self.service.subscriptions.unsubscribe_while_deleting( + self.rules, child_path, stage + ) self.service.delete_all_child_objects( self.rules, convert_path_to_se_path(self.path), obj_type ) @@ -1387,7 +1471,20 @@ def _del_item(self, key: str) -> None: if key in self._get_child_object_display_names(): child_path = self.path[:-1] child_path.append((self.path[-1][0], key)) - self.service.delete_object(self.rules, convert_path_to_se_path(child_path)) + se_path = convert_path_to_se_path(child_path) + # All subscription objects except those of on-deleted type are unsubscribed + # before the datamodel object is deleted. + self.service.subscriptions.unsubscribe_while_deleting( + self.rules, se_path, "before" + ) + # On-deleted subscription objects are unsubscribed after the datamodel + # object is deleted. + self[key].add_on_deleted( + lambda _: self.service.subscriptions.unsubscribe_while_deleting( + self.rules, se_path, "after" + ) + ) + self.service.delete_object(self.rules, se_path) else: raise LookupError( f"{key} is not found at path " f"{convert_path_to_se_path(self.path)}" diff --git a/src/ansys/fluent/core/streaming_services/datamodel_event_streaming.py b/src/ansys/fluent/core/streaming_services/datamodel_event_streaming.py index f5ff9ee9b68..5c0ff5e1363 100644 --- a/src/ansys/fluent/core/streaming_services/datamodel_event_streaming.py +++ b/src/ansys/fluent/core/streaming_services/datamodel_event_streaming.py @@ -53,30 +53,30 @@ def _process_streaming(self, id, stream_begin_method, started_evt, *args, **kwar ) with self._lock: self._streaming = True - for tag, cb in self._cbs.items(): - if tag == response.tag: - if response.HasField("createdEventResponse"): - childtype = response.createdEventResponse.childtype - childname = response.createdEventResponse.childname - child = getattr(cb[0], childtype)[childname] - cb[1](child) - elif response.HasField("attributeChangedEventResponse"): - value = response.attributeChangedEventResponse.value - cb[1](_convert_variant_to_value(value)) - elif ( - response.HasField("modifiedEventResponse") - or response.HasField("deletedEventResponse") - or response.HasField("affectedEventResponse") - or response.HasField( - "commandAttributeChangedEventResponse" - ) - ): - cb[1](cb[0]) - elif response.HasField("commandExecutedEventResponse"): - command = response.commandExecutedEventResponse.command - args = _convert_variant_to_value( - response.commandExecutedEventResponse.args - ) - cb[1](cb[0], command, args) + cb = self._cbs.get(response.tag, None) + if cb: + if response.HasField("createdEventResponse"): + childtype = response.createdEventResponse.childtype + childname = response.createdEventResponse.childname + child = getattr(cb[0], childtype)[childname] + cb[1](child) + elif response.HasField("attributeChangedEventResponse"): + value = response.attributeChangedEventResponse.value + cb[1](_convert_variant_to_value(value)) + elif response.HasField("commandAttributeChangedEventResponse"): + value = response.commandAttributeChangedEventResponse.value + cb[1](_convert_variant_to_value(value)) + elif ( + response.HasField("modifiedEventResponse") + or response.HasField("deletedEventResponse") + or response.HasField("affectedEventResponse") + ): + cb[1](cb[0]) + elif response.HasField("commandExecutedEventResponse"): + command = response.commandExecutedEventResponse.command + args = _convert_variant_to_value( + response.commandExecutedEventResponse.args + ) + cb[1](cb[0], command, args) except StopIteration: break diff --git a/src/ansys/fluent/core/workflow.py b/src/ansys/fluent/core/workflow.py index 51a78420957..9cc6d723064 100644 --- a/src/ansys/fluent/core/workflow.py +++ b/src/ansys/fluent/core/workflow.py @@ -526,9 +526,9 @@ def insert_next_task(self, task_name: str): Raises ------ ValueError - If the python name does not match the next possible task names. + If the Python name does not match the next possible task names. """ - # The next line populates the python name map for next possible task + # The next line populates the Python name map for next possible task self._get_next_python_task_names() if task_name not in self.get_next_possible_tasks(): raise ValueError( @@ -541,7 +541,7 @@ def insert_next_task(self, task_name: str): @property def next_tasks(self): - """Tasks that can be inserted after this current task.""" + """Tasks that can be inserted after the current task.""" return self._NextTask(self) class _NextTask: @@ -561,12 +561,12 @@ def __call__(self): class _Insert: def __init__(self, base_task, name): - """Initialize _Insert.""" + """Initialize an ``_Insert`` instance.""" self._base_task = base_task self._name = name def insert(self): - """Inserts a task in the workflow.""" + """Insert a task in the workflow.""" return self._base_task.insert_next_task(task_name=self._name) def __repr__(self): diff --git a/tests/test_datamodel_service.py b/tests/test_datamodel_service.py index 95e09c2515a..a16a09eb077 100644 --- a/tests/test_datamodel_service.py +++ b/tests/test_datamodel_service.py @@ -1,3 +1,4 @@ +import gc from time import sleep from google.protobuf.json_format import MessageToDict @@ -9,12 +10,17 @@ import ansys.fluent.core as pyfluent from ansys.fluent.core import examples from ansys.fluent.core.services.datamodel_se import ( + PyCommand, + PyMenu, PyMenuGeneric, + PyNamedObjectContainer, + PyTextual, ReadOnlyObjectError, _convert_variant_to_value, convert_path_to_se_path, ) from ansys.fluent.core.streaming_services.datamodel_streaming import DatamodelStream +from ansys.fluent.core.utils.execution import timeout_loop @pytest.mark.fluent_version(">=23.2") @@ -487,3 +493,267 @@ def test_read_ony_set_state(new_mesh_session): assert "set_state" not in dir( meshing.preferences.MeshingWorkflow.CheckpointingOption ) + + +test_rules = ( + "RULES:\n" + " SINGLETON: ROOT\n" + " members = A\n" + " OBJECT: A\n" + " members = B, X\n" + " commands = C\n" + " isABC = $./X == ABC\n" + " OBJECT: B\n" + " END\n" + " STRING: X\n" + " default = IJK\n" + " END\n" + " COMMAND: C\n" + " returnType = Logical\n" + " functionName = S_C\n" + " isABC = $../X == ABC\n" + " END\n" + " END\n" + " END\n" + "END\n" +) + + +class test_root(PyMenu): + def __init__(self, service, rules, path): + self.A = self.__class__.A(service, rules, path + [("A", "")]) + super().__init__(service, rules, path) + + class A(PyNamedObjectContainer): + class _A(PyMenu): + def __init__(self, service, rules, path): + self.B = self.__class__.B(service, rules, path + [("B", "")]) + self.X = self.__class__.X(service, rules, path + [("X", "")]) + self.C = self.__class__.C(service, rules, "C", path) + super().__init__(service, rules, path) + + class B(PyNamedObjectContainer): + class _B(PyMenu): + pass + + class X(PyTextual): + pass + + class C(PyCommand): + pass + + +def _create_datamodel_root(session, rules_str) -> PyMenu: + rules_file_name = "test.fdl" + session.scheme_eval.scheme_eval( + f'(with-output-to-file "{rules_file_name}" (lambda () (format "~a" "{rules_str}")))' + ) + session.scheme_eval.scheme_eval( + '(state/register-new-state-engine "test" "test.fdl")' + ) + session.scheme_eval.scheme_eval(f'(remove-file "{rules_file_name}")') + assert session.scheme_eval.scheme_eval('(state/find-root "test")') > 0 + return test_root(session._se_service, "test", []) + + +@pytest.mark.fluent_version(">=24.2") +def test_on_child_created_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_child_created("B", lambda _: data.append(1)) + root.A["A1"].add_on_child_created("B", lambda _: data.append(2)) + gc.collect() + assert "/test/created/A:A1/B" in solver._se_service.subscriptions + assert "/test/created/A:A1/B-1" in solver._se_service.subscriptions + root.A["A1"].B["B1"] = {} + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert "/test/created/A:A1/B" not in solver._se_service.subscriptions + assert "/test/created/A:A1/B-1" not in solver._se_service.subscriptions + + +@pytest.mark.fluent_version(">=24.2") +def test_on_deleted_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_deleted(lambda _: data.append(1)) + root.A["A1"].add_on_deleted(lambda _: data.append(2)) + gc.collect() + assert "/test/deleted/A:A1" in solver._se_service.subscriptions + assert "/test/deleted/A:A1-1" in solver._se_service.subscriptions + del root.A["A1"] + assert timeout_loop(lambda: data == [1, 2], 5) + assert timeout_loop( + lambda: "/test/deleted/A:A1" not in solver._se_service.subscriptions, 5 + ) + assert timeout_loop( + lambda: "/test/deleted/A:A1-1" not in solver._se_service.subscriptions, 5 + ) + + +@pytest.mark.fluent_version(">=24.2") +def test_on_changed_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].X.add_on_changed(lambda _: data.append(1)) + root.A["A1"].X.add_on_changed(lambda _: data.append(2)) + gc.collect() + assert "/test/modified/A:A1/X" in solver._se_service.subscriptions + assert "/test/modified/A:A1/X-1" in solver._se_service.subscriptions + root.A["A1"].X = "ABC" + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert "/test/modified/A:A1/X" not in solver._se_service.subscriptions + assert "/test/modified/A:A1/X-1" not in solver._se_service.subscriptions + + +@pytest.mark.fluent_version(">=24.2") +def test_on_affected_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_affected(lambda _: data.append(1)) + root.A["A1"].add_on_affected(lambda _: data.append(2)) + gc.collect() + assert "/test/affected/A:A1" in solver._se_service.subscriptions + assert "/test/affected/A:A1-1" in solver._se_service.subscriptions + root.A["A1"].X = "ABC" + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert "/test/affected/A:A1" not in solver._se_service.subscriptions + assert "/test/affected/A:A1-1" not in solver._se_service.subscriptions + + +@pytest.mark.fluent_version(">=24.2") +def test_on_affected_at_type_path_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_affected_at_type_path("B", lambda _: data.append(1)) + root.A["A1"].add_on_affected_at_type_path("B", lambda _: data.append(2)) + gc.collect() + assert "/test/affected/A:A1/B" in solver._se_service.subscriptions + assert "/test/affected/A:A1/B-1" in solver._se_service.subscriptions + root.A["A1"].B["B1"] = {} + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert "/test/affected/A:A1/B" not in solver._se_service.subscriptions + assert "/test/affected/A:A1/B-1" not in solver._se_service.subscriptions + + +@pytest.mark.fluent_version(">=24.2") +def test_on_command_executed_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_command_executed("C", lambda *args: data.append(1)) + root.A["A1"].add_on_command_executed("C", lambda *args: data.append(2)) + gc.collect() + assert "/test/command_executed/A:A1/C" in solver._se_service.subscriptions + assert "/test/command_executed/A:A1/C-1" in solver._se_service.subscriptions + root.A["A1"].C() + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert "/test/command_executed/A:A1/C" not in solver._se_service.subscriptions + assert "/test/command_executed/A:A1/C-1" not in solver._se_service.subscriptions + + +@pytest.mark.fluent_version(">=24.2") +def test_on_attribute_changed_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_attribute_changed("isABC", lambda _: data.append(1)) + root.A["A1"].add_on_attribute_changed("isABC", lambda _: data.append(2)) + gc.collect() + assert "/test/attribute_changed/A:A1/isABC" in solver._se_service.subscriptions + assert "/test/attribute_changed/A:A1/isABC-1" in solver._se_service.subscriptions + root.A["A1"].X = "ABC" + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert "/test/attribute_changed/A:A1/isABC" not in solver._se_service.subscriptions + assert ( + "/test/attribute_changed/A:A1/isABC-1" not in solver._se_service.subscriptions + ) + + +@pytest.mark.fluent_version(">=24.2") +def test_on_command_attribute_changed_lifetime(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_command_attribute_changed( + "C", "isABC", lambda _: data.append(1) + ) + root.A["A1"].add_on_command_attribute_changed( + "C", "isABC", lambda _: data.append(2) + ) + gc.collect() + assert ( + "/test/command_attribute_changed/A:A1/C/isABC" + in solver._se_service.subscriptions + ) + assert ( + "/test/command_attribute_changed/A:A1/C/isABC-1" + in solver._se_service.subscriptions + ) + root.A["A1"].X = "ABC" + assert timeout_loop(lambda: data == [1, 2], 5) + del root.A["A1"] + assert ( + "/test/command_attribute_changed/A:A1/C/isABC" + not in solver._se_service.subscriptions + ) + assert ( + "/test/command_attribute_changed/A:A1/C/isABC-1" + not in solver._se_service.subscriptions + ) + + +@pytest.mark.fluent_version(">=24.2") +def test_on_affected_lifetime_with_delete_child_objects(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + pyfluent.logging.enable() + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_affected(lambda _: data.append(1)) + root.A["A1"].add_on_affected(lambda _: data.append(2)) + gc.collect() + assert "/test/affected/A:A1" in solver._se_service.subscriptions + assert "/test/affected/A:A1-1" in solver._se_service.subscriptions + root.A["A1"].X = "ABC" + assert timeout_loop(lambda: data == [1, 2], 5) + root.delete_child_objects("A", ["A1"]) + assert "/test/affected/A:A1" not in solver._se_service.subscriptions + assert "/test/affected/A:A1-1" not in solver._se_service.subscriptions + + +@pytest.mark.fluent_version(">=24.2") +def test_on_affected_lifetime_with_delete_all_child_objects(new_solver_session): + solver = new_solver_session + root = _create_datamodel_root(solver, test_rules) + pyfluent.logging.enable() + root.A["A1"] = {} + data = [] + h = root.A["A1"].add_on_affected(lambda _: data.append(1)) + root.A["A1"].add_on_affected(lambda _: data.append(2)) + gc.collect() + assert "/test/affected/A:A1" in solver._se_service.subscriptions + assert "/test/affected/A:A1-1" in solver._se_service.subscriptions + root.A["A1"].X = "ABC" + assert timeout_loop(lambda: data == [1, 2], 5) + root.delete_all_child_objects("A") + assert "/test/affected/A:A1" not in solver._se_service.subscriptions + assert "/test/affected/A:A1-1" not in solver._se_service.subscriptions diff --git a/tests/test_settings_api.py b/tests/test_settings_api.py index cd6e41477b9..8a03a152e60 100644 --- a/tests/test_settings_api.py +++ b/tests/test_settings_api.py @@ -320,6 +320,7 @@ def warning_record(): yield wrec +@pytest.mark.skip("https://github.com/ansys/pyfluent/issues/2712") @pytest.mark.fluent_version(">=24.2") def test_unstable_settings_warning(new_solver_session, warning_record): solver = new_solver_session