Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Fluent Connection #2617

Merged
merged 24 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
52c4f9f
refactor: Fluent Connection
prmukherj Mar 26, 2024
bf3a0a9
Fix import.
prmukherj Mar 26, 2024
af10c13
Fix watchdog.
prmukherj Mar 27, 2024
32016f2
Merge branch 'main' into maint/fluent_connection_refactor
prmukherj Mar 28, 2024
0a34962
refactor: More FluentConnection refactoring. (#2636)
prmukherj Mar 31, 2024
875664a
Add docstring.
prmukherj Apr 1, 2024
b997465
Merge branch 'main' into maint/fluent_connection_refactor
prmukherj Apr 1, 2024
8fd5706
Move channel formation.
prmukherj Apr 3, 2024
01668b9
More restructuring.
prmukherj Apr 3, 2024
915c218
Merge branch 'main' into maint/fluent_connection_refactor
prmukherj Apr 3, 2024
d174d1e
Update connection props changes.
prmukherj Apr 4, 2024
323920f
More....
prmukherj Apr 4, 2024
0763fa5
More....
prmukherj Apr 4, 2024
d66a6df
Revert.
prmukherj Apr 4, 2024
ab751ac
refactor: Move scheme eval. (#2658)
prmukherj Apr 8, 2024
0b8fc1f
Merge branch 'main' into maint/fluent_connection_refactor
prmukherj Apr 8, 2024
ade8d1c
Update src/ansys/fluent/core/fluent_connection.py
prmukherj Apr 8, 2024
8223768
Update src/ansys/fluent/core/fluent_connection.py
prmukherj Apr 8, 2024
39f1813
Update src/ansys/fluent/core/fluent_connection.py
prmukherj Apr 8, 2024
a9783a4
Update src/ansys/fluent/core/fluent_connection.py
prmukherj Apr 8, 2024
2cb4459
Update src/ansys/fluent/core/session.py
prmukherj Apr 8, 2024
fc6f2ad
Update src/ansys/fluent/core/session.py
prmukherj Apr 8, 2024
130a03c
Update docstrings.
prmukherj Apr 8, 2024
ae14213
Merge branch 'main' into maint/fluent_connection_refactor
prmukherj Apr 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 92 additions & 63 deletions src/ansys/fluent/core/fluent_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import socket
import subprocess
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Callable, List, Optional, Tuple, Union
import warnings
import weakref

Expand Down Expand Up @@ -201,6 +201,84 @@ def list_values(self) -> dict:
return vars(self)


def _get_ip_and_port(
ip: Optional[str] = None, port: Optional[int] = None
) -> (str, int):
if not ip:
ip = os.getenv("PYFLUENT_FLUENT_IP", "127.0.0.1")
if not port:
port = os.getenv("PYFLUENT_FLUENT_PORT")
if not port:
raise PortNotProvided()
return ip, port


def _get_channel(ip: str, port: int):
# Same maximum message length is used in the server
max_message_length = _get_max_c_int_limit()
return grpc.insecure_channel(
f"{ip}:{port}",
options=[
("grpc.max_send_message_length", max_message_length),
("grpc.max_receive_message_length", max_message_length),
],
)


class _ConnectionInterface:
def __init__(self, create_grpc_service, error_state):
self._scheme_eval_service = create_grpc_service(SchemeEvalService, error_state)
self.scheme_eval = service_creator("scheme_eval").create(
self._scheme_eval_service
)

@property
def product_build_info(self) -> str:
"""Get Fluent build info."""
prmukherj marked this conversation as resolved.
Show resolved Hide resolved
build_time = self.scheme_eval.scheme_eval("(inquire-build-time)")
build_id = self.scheme_eval.scheme_eval("(inquire-build-id)")
rev = self.scheme_eval.scheme_eval("(inquire-src-vcs-id)")
branch = self.scheme_eval.scheme_eval("(inquire-src-vcs-branch)")
return f"Build Time: {build_time} Build Id: {build_id} Revision: {rev} Branch: {branch}"

def get_cortex_connection_properties(self):
"""Get connection properties of Fluent."""
from grpc._channel import _InactiveRpcError

try:
logger.info(self.product_build_info)
logger.debug("Obtaining Cortex connection properties...")
fluent_host_pid = self.scheme_eval.scheme_eval("(cx-client-id)")
cortex_host = self.scheme_eval.scheme_eval("(cx-cortex-host)")
cortex_pid = self.scheme_eval.scheme_eval("(cx-cortex-id)")
cortex_pwd = self.scheme_eval.scheme_eval("(cortex-pwd)")
logger.debug("Cortex connection properties successfully obtained.")
except _InactiveRpcError:
logger.warning(
"Fluent Cortex properties unobtainable, force exit and other"
"methods are not going to work properly, proceeding..."
prmukherj marked this conversation as resolved.
Show resolved Hide resolved
)
fluent_host_pid = None
cortex_host = None
cortex_pid = None
cortex_pwd = None

return fluent_host_pid, cortex_host, cortex_pid, cortex_pwd

def is_solver_mode(self):
"""Checks if the Fluent session is in solver mode.

Returns
--------
``True`` if the Fluent session is in solver mode, ``False`` otherwise.
"""
return self.scheme_eval.scheme_eval("(cx-solver-mode?)")

def exit_server(self):
"""Exits the server."""
self.scheme_eval.exec(("(exit-server)",))


class FluentConnection:
"""Encapsulates a Fluent connection.

Expand All @@ -224,9 +302,8 @@ def __init__(
password: Optional[str] = None,
channel: Optional[grpc.Channel] = None,
cleanup_on_exit: bool = True,
start_transcript: bool = True,
remote_instance: Optional[Instance] = None,
launcher_args: Optional[Dict[str, Any]] = None,
slurm_job_id: Optional[str] = None,
inside_container: Optional[bool] = None,
):
"""Initialize a Session.
Expand All @@ -252,14 +329,12 @@ def __init__(
When True, the connected Fluent session will be shut down
when PyFluent is exited or exit() is called on the session
instance, by default True.
start_transcript : bool, optional
The Fluent transcript is started in the client only when
start_transcript is True. It can be started and stopped
subsequently via method calls on the Session object.
remote_instance : ansys.platform.instancemanagement.Instance
The corresponding remote instance when Fluent is launched through
PyPIM. This instance will be deleted when calling
``Session.exit()``.
slurm_job_id: bool, optional
The job id of a Fluent session running within a Slurm environment.
prmukherj marked this conversation as resolved.
Show resolved Hide resolved
inside_container: bool, optional
Whether the Fluent session that is being connected to
is running inside a docker container.
prmukherj marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -277,22 +352,9 @@ def __init__(
if channel is not None:
self._channel = channel
else:
if not ip:
ip = os.getenv("PYFLUENT_FLUENT_IP", "127.0.0.1")
if not port:
port = os.getenv("PYFLUENT_FLUENT_PORT")
ip, port = _get_ip_and_port(ip, port)
self._channel = _get_channel(ip, port)
self._channel_str = f"{ip}:{port}"
if not port:
raise PortNotProvided()
# Same maximum message length is used in the server
max_message_length = _get_max_c_int_limit()
self._channel = grpc.insecure_channel(
f"{ip}:{port}",
options=[
("grpc.max_send_message_length", max_message_length),
("grpc.max_receive_message_length", max_message_length),
],
)
self._metadata: List[Tuple[str, str]] = (
[("password", password)] if password else []
)
Expand All @@ -304,44 +366,21 @@ def __init__(
# throws, we should not proceed.
self.health_check.check_health()

self._slurm_job_id = launcher_args and launcher_args.get("slurm_job_id")
self._slurm_job_id = slurm_job_id

self._id = f"session-{next(FluentConnection._id_iter)}"

if not FluentConnection._monitor_thread:
FluentConnection._monitor_thread = MonitorThread()
FluentConnection._monitor_thread.start()

# Move this service later.
# Currently, required by launcher to connect to a running session.
self._scheme_eval_service = self.create_grpc_service(
SchemeEvalService, self._error_state
self._connection_interface = _ConnectionInterface(
self.create_grpc_service, self._error_state
)
self.scheme_eval = service_creator("scheme_eval").create(
self._scheme_eval_service
fluent_host_pid, cortex_host, cortex_pid, cortex_pwd = (
self._connection_interface.get_cortex_connection_properties()
)

self._cleanup_on_exit = cleanup_on_exit
self.start_transcript = start_transcript
from grpc._channel import _InactiveRpcError

try:
logger.info(self.fluent_build_info)
logger.debug("Obtaining Cortex connection properties...")
fluent_host_pid = self.scheme_eval.scheme_eval("(cx-client-id)")
cortex_host = self.scheme_eval.scheme_eval("(cx-cortex-host)")
cortex_pid = self.scheme_eval.scheme_eval("(cx-cortex-id)")
cortex_pwd = self.scheme_eval.scheme_eval("(cortex-pwd)")
logger.debug("Cortex connection properties successfully obtained.")
except _InactiveRpcError:
logger.warning(
"Fluent Cortex properties unobtainable, force exit and other"
"methods are not going to work properly, proceeding..."
)
cortex_host = None
cortex_pid = None
cortex_pwd = None
fluent_host_pid = None

if (
(inside_container is None or inside_container is True)
Expand Down Expand Up @@ -371,7 +410,6 @@ def __init__(
)

self._remote_instance = remote_instance
self.launcher_args = launcher_args

self._exit_event = threading.Event()

Expand All @@ -389,22 +427,13 @@ def __init__(
FluentConnection._exit,
self._channel,
self._cleanup_on_exit,
self.scheme_eval,
self._connection_interface,
self.finalizer_cbs,
self._remote_instance,
self._exit_event,
)
FluentConnection._monitor_thread.cbs.append(self._finalizer)

@property
def fluent_build_info(self) -> str:
"""Get Fluent build info."""
build_time = self.scheme_eval.scheme_eval("(inquire-build-time)")
build_id = self.scheme_eval.scheme_eval("(inquire-build-id)")
rev = self.scheme_eval.scheme_eval("(inquire-src-vcs-id)")
branch = self.scheme_eval.scheme_eval("(inquire-src-vcs-branch)")
return f"Build Time: {build_time} Build Id: {build_id} Revision: {rev} Branch: {branch}"

def _close_slurm(self):
subprocess.run(["scancel", f"{self._slurm_job_id}"])

Expand Down Expand Up @@ -665,7 +694,7 @@ def exit(
def _exit(
channel,
cleanup_on_exit,
scheme_eval,
connection_interface,
finalizer_cbs,
remote_instance,
exit_event,
Expand All @@ -676,7 +705,7 @@ def _exit(
cb()
if cleanup_on_exit:
try:
scheme_eval.exec(("(exit-server)",))
connection_interface.exit_server()
except Exception:
pass
channel.close()
Expand Down
4 changes: 2 additions & 2 deletions src/ansys/fluent/core/launcher/container_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ def __call__(self):
port=port,
password=password,
cleanup_on_exit=self.cleanup_on_exit,
start_transcript=self.start_transcript,
launcher_args=self.argvals,
slurm_job_id=self.argvals and self.argvals.get("slurm_job_id"),
inside_container=True,
),
file_transfer_service=self.file_transfer_service,
start_transcript=self.start_transcript,
)

if self.start_watchdog is None and self.cleanup_on_exit:
Expand Down
2 changes: 1 addition & 1 deletion src/ansys/fluent/core/launcher/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ def connect_to_fluent(
port=port,
password=password,
cleanup_on_exit=cleanup_on_exit,
start_transcript=start_transcript,
)
new_session = _get_running_session_mode(fluent_connection)

Expand All @@ -374,4 +373,5 @@ def connect_to_fluent(

return new_session(
fluent_connection=fluent_connection,
start_transcript=start_transcript,
)
7 changes: 4 additions & 3 deletions src/ansys/fluent/core/launcher/pim_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ def launch_remote_fluent(
channel=channel,
cleanup_on_exit=cleanup_on_exit,
remote_instance=instance,
start_transcript=start_transcript,
launcher_args=launcher_args,
slurm_job_id=launcher_args and launcher_args.get("slurm_job_id"),
)

file_transfer_service = (
Expand All @@ -303,5 +302,7 @@ def launch_remote_fluent(
)

return session_cls(
fluent_connection=fluent_connection, file_transfer_service=file_transfer_service
fluent_connection=fluent_connection,
file_transfer_service=file_transfer_service,
start_transcript=start_transcript,
)
2 changes: 1 addition & 1 deletion src/ansys/fluent/core/launcher/pyfluent_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _get_running_session_mode(
try:
session_mode = FluentMode.get_mode(
"solver"
if fluent_connection.scheme_eval.scheme_eval("(cx-solver-mode?)")
if fluent_connection._connection_interface.is_solver_mode()
else "meshing"
)
except Exception as ex:
Expand Down
3 changes: 1 addition & 2 deletions src/ansys/fluent/core/launcher/watchdog_exec
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ if __name__ == "__main__":
"ip": ip,
"port": int(port),
"password": password,
"launcher_args": None,
"start_transcript": False,
"slurm_job_id": None,
"cleanup_on_exit": True,
}

Expand Down
Loading
Loading