Skip to content

Commit

Permalink
Merge branch 'feat/load_saved_workflows' into feat/create_new_workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
prmukherj authored Apr 19, 2024
2 parents 9094611 + ffad517 commit f9b2d75
Show file tree
Hide file tree
Showing 7 changed files with 421 additions and 57 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ jobs:
version: 241
- image-tag: v24.2.0
version: 242
timeout-minutes: 120

steps:

Expand Down
139 changes: 118 additions & 21 deletions src/ansys/fluent/core/services/datamodel_se.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import itertools
import logging
import os
from threading import RLock
from typing import Any, Callable, Iterator, NoReturn, Optional, Sequence, Union

from google.protobuf.json_format import MessageToDict, ParseDict
Expand Down Expand Up @@ -334,6 +335,7 @@ class EventSubscription:
def __init__(
self,
service,
path,
request_dict: dict[str, Any],
) -> None:
"""Subscribe to a datamodel event.
Expand All @@ -343,16 +345,17 @@ def __init__(
SubscribeEventError
If server fails to subscribe from event.
"""
self.is_subscribed = False
self._service = service
self.is_subscribed: bool = False
self._service: DatamodelService = service
self.path: str = path
response = service.subscribe_events(request_dict)
response = response[0]
if response["status"] != DataModelProtoModule.STATUS_SUBSCRIBED:
raise SubscribeEventError(request_dict)
else:
self.is_subscribed = True
self.tag = response["tag"]
self._service.events[self.tag] = self
self.tag: str = response["tag"]
self._service.subscriptions.add(self.tag, self)

def unsubscribe(self) -> None:
"""Unsubscribe the datamodel event.
Expand All @@ -370,11 +373,79 @@ def unsubscribe(self) -> None:
raise UnsubscribeEventError(self.tag)
else:
self.is_subscribed = False
self._service.events.pop(self.tag, None)
self._service.subscriptions.remove(self.tag)

def __del__(self) -> None:
"""Unsubscribe the datamodel event."""
self.unsubscribe()

class SubscriptionList:
"""Stores subscription objects by tag."""

def __init__(self):
self._subscriptions = {}
self._lock = RLock()

def __contains__(self, tag: str) -> bool:
with self._lock:
return tag in self._subscriptions

def add(self, tag: str, subscription: EventSubscription) -> None:
"""Add a subscription object.
Parameters
----------
tag : str
Subscription tag.
subscription : EventSubscription
Subscription object.
"""
with self._lock:
self._subscriptions[tag] = subscription

def remove(self, tag: str) -> None:
"""Remove a subscription object.
Parameters
----------
tag : str
Subscription tag.
"""
with self._lock:
self._subscriptions.pop(tag, None)

def unsubscribe_while_deleting(
self, rules: str, path: str, deletion_stage: str
) -> None:
"""Unsubscribe corresponding subscription objects while the datamodel object is
being deleted.
Parameters
----------
rules : str
Datamodel object rules.
path : str
Datamodel object path.
deletion_stage : {"before", "after"}
All subscription objects except those of on-deleted type are unsubscribed
before the datamodel object is deleted. On-deleted subscription objects are
unsubscribed after the datamodel object is deleted.
"""
with self._lock:
delete_tag = f"/{rules}/deleted"
after = deletion_stage == "after"
keys_to_unsubscribe = []
for k, v in self._subscriptions.items():
if v.path.startswith(path) and not (
after ^ v.tag.startswith(delete_tag)
):
keys_to_unsubscribe.append(k)
for k in reversed(keys_to_unsubscribe):
self._subscriptions[k].unsubscribe()

def unsubscribe_all(self) -> None:
"""Unsubscribe all subscription objects."""
with self._lock:
while self._subscriptions:
v = next(reversed(self._subscriptions.values()))
v.unsubscribe()


class DatamodelService(StreamingService):
Expand All @@ -394,7 +465,7 @@ def __init__(
metadata=metadata,
)
self.event_streaming = None
self.events = {}
self.subscriptions = SubscriptionList()
self.file_transfer_service = file_transfer_service

def get_attribute_value(self, rules: str, path: str, attribute: str) -> _TValue:
Expand Down Expand Up @@ -561,9 +632,7 @@ def unsubscribe_events(self, tags: list[str]) -> dict[str, Any]:

def unsubscribe_all_events(self) -> None:
"""Unsubscribe all subscribed events."""
for event in list(self.events.values()):
event.unsubscribe()
self.events.clear()
self.subscriptions.unsubscribe_all()

def add_on_child_created(
self, rules: str, path: str, child_type: str, obj, cb: Callable
Expand All @@ -580,7 +649,7 @@ def add_on_child_created(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -596,7 +665,7 @@ def add_on_deleted(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -612,7 +681,7 @@ def add_on_changed(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -628,7 +697,7 @@ def add_on_affected(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -647,7 +716,7 @@ def add_on_affected_at_type_path(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -666,7 +735,7 @@ def add_on_command_executed(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -685,7 +754,7 @@ def add_on_attribute_changed(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand All @@ -705,7 +774,7 @@ def add_on_command_attribute_changed(
}
]
}
subscription = EventSubscription(self, request_dict)
subscription = EventSubscription(self, path, request_dict)
self.event_streaming.register_callback(subscription.tag, obj, cb)
return subscription

Expand Down Expand Up @@ -1023,6 +1092,14 @@ def delete_child_objects(self, obj_type: str, child_names: list[str]):
child_names : List[str]
List of named objects.
"""
for child_name in child_names:
child_path = f"{convert_path_to_se_path(self.path)}/{obj_type}:{child_name}"
# delete_child_objects doesn't stream back on-deleted events. Thus
# unsubscribing all subscription objects before the deletion.
for stage in ["before", "after"]:
self.service.subscriptions.unsubscribe_while_deleting(
self.rules, child_path, stage
)
self.service.delete_child_objects(
self.rules, convert_path_to_se_path(self.path), obj_type, child_names
)
Expand All @@ -1037,6 +1114,13 @@ def delete_all_child_objects(self, obj_type):
obj_type: str
Type of the named object container.
"""
child_path = f"{convert_path_to_se_path(self.path)}/{obj_type}:"
# delete_all_child_objects doesn't stream back on-deleted events. Thus
# unsubscribing all subscription objects before the deletion.
for stage in ["before", "after"]:
self.service.subscriptions.unsubscribe_while_deleting(
self.rules, child_path, stage
)
self.service.delete_all_child_objects(
self.rules, convert_path_to_se_path(self.path), obj_type
)
Expand Down Expand Up @@ -1387,7 +1471,20 @@ def _del_item(self, key: str) -> None:
if key in self._get_child_object_display_names():
child_path = self.path[:-1]
child_path.append((self.path[-1][0], key))
self.service.delete_object(self.rules, convert_path_to_se_path(child_path))
se_path = convert_path_to_se_path(child_path)
# All subscription objects except those of on-deleted type are unsubscribed
# before the datamodel object is deleted.
self.service.subscriptions.unsubscribe_while_deleting(
self.rules, se_path, "before"
)
# On-deleted subscription objects are unsubscribed after the datamodel
# object is deleted.
self[key].add_on_deleted(
lambda _: self.service.subscriptions.unsubscribe_while_deleting(
self.rules, se_path, "after"
)
)
self.service.delete_object(self.rules, se_path)
else:
raise LookupError(
f"{key} is not found at path " f"{convert_path_to_se_path(self.path)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,30 @@ def _process_streaming(self, id, stream_begin_method, started_evt, *args, **kwar
)
with self._lock:
self._streaming = True
for tag, cb in self._cbs.items():
if tag == response.tag:
if response.HasField("createdEventResponse"):
childtype = response.createdEventResponse.childtype
childname = response.createdEventResponse.childname
child = getattr(cb[0], childtype)[childname]
cb[1](child)
elif response.HasField("attributeChangedEventResponse"):
value = response.attributeChangedEventResponse.value
cb[1](_convert_variant_to_value(value))
elif (
response.HasField("modifiedEventResponse")
or response.HasField("deletedEventResponse")
or response.HasField("affectedEventResponse")
or response.HasField(
"commandAttributeChangedEventResponse"
)
):
cb[1](cb[0])
elif response.HasField("commandExecutedEventResponse"):
command = response.commandExecutedEventResponse.command
args = _convert_variant_to_value(
response.commandExecutedEventResponse.args
)
cb[1](cb[0], command, args)
cb = self._cbs.get(response.tag, None)
if cb:
if response.HasField("createdEventResponse"):
childtype = response.createdEventResponse.childtype
childname = response.createdEventResponse.childname
child = getattr(cb[0], childtype)[childname]
cb[1](child)
elif response.HasField("attributeChangedEventResponse"):
value = response.attributeChangedEventResponse.value
cb[1](_convert_variant_to_value(value))
elif response.HasField("commandAttributeChangedEventResponse"):
value = response.commandAttributeChangedEventResponse.value
cb[1](_convert_variant_to_value(value))
elif (
response.HasField("modifiedEventResponse")
or response.HasField("deletedEventResponse")
or response.HasField("affectedEventResponse")
):
cb[1](cb[0])
elif response.HasField("commandExecutedEventResponse"):
command = response.commandExecutedEventResponse.command
args = _convert_variant_to_value(
response.commandExecutedEventResponse.args
)
cb[1](cb[0], command, args)
except StopIteration:
break
7 changes: 1 addition & 6 deletions src/ansys/fluent/core/utils/file_transfer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from alive_progress import alive_bar

import ansys.fluent.core as pyfluent
from ansys.fluent.core.launcher.process_launch_string import get_fluent_exe_path
import ansys.platform.instancemanagement as pypim
import ansys.tools.filetransfer as ft

Expand Down Expand Up @@ -51,11 +50,7 @@ def __init__(self, server_cwd: Optional[str] = None):
"""
self.pyfluent_cwd = pathlib.Path(str(os.getcwd()))
self.fluent_cwd = (
pathlib.Path(str(server_cwd))
if server_cwd
else (
pathlib.Path(str(get_fluent_exe_path()).split("fluent")[0]) / "fluent"
)
pathlib.Path(str(server_cwd)) if server_cwd else self.pyfluent_cwd
)

def file_exists_on_remote(self, file_name: str) -> bool:
Expand Down
10 changes: 5 additions & 5 deletions src/ansys/fluent/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,9 @@ def insert_next_task(self, task_name: str):
Raises
------
ValueError
If the python name does not match the next possible task names.
If the Python name does not match the next possible task names.
"""
# The next line populates the python name map for next possible task
# The next line populates the Python name map for next possible task
self._get_next_python_task_names()
if task_name not in self.get_next_possible_tasks():
raise ValueError(
Expand All @@ -541,7 +541,7 @@ def insert_next_task(self, task_name: str):

@property
def next_tasks(self):
"""Tasks that can be inserted after this current task."""
"""Tasks that can be inserted after the current task."""
return self._NextTask(self)

class _NextTask:
Expand All @@ -561,12 +561,12 @@ def __call__(self):

class _Insert:
def __init__(self, base_task, name):
"""Initialize _Insert."""
"""Initialize an ``_Insert`` instance."""
self._base_task = base_task
self._name = name

def insert(self):
"""Inserts a task in the workflow."""
"""Insert a task in the workflow."""
return self._base_task.insert_next_task(task_name=self._name)

def __repr__(self):
Expand Down
Loading

0 comments on commit f9b2d75

Please sign in to comment.