Skip to content

Commit

Permalink
Merge branch 'main' into maint/refactor_meshing_workflow_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
prmukherj authored Apr 9, 2024
2 parents cc88493 + ad066f8 commit 7bc7613
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 89 deletions.
157 changes: 93 additions & 64 deletions src/ansys/fluent/core/fluent_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import socket
import subprocess
import threading
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Callable, List, Optional, Tuple, Union
import warnings
import weakref

Expand Down Expand Up @@ -201,6 +201,84 @@ def list_values(self) -> dict:
return vars(self)


def _get_ip_and_port(
ip: Optional[str] = None, port: Optional[int] = None
) -> (str, int):
if not ip:
ip = os.getenv("PYFLUENT_FLUENT_IP", "127.0.0.1")
if not port:
port = os.getenv("PYFLUENT_FLUENT_PORT")
if not port:
raise PortNotProvided()
return ip, port


def _get_channel(ip: str, port: int):
# Same maximum message length is used in the server
max_message_length = _get_max_c_int_limit()
return grpc.insecure_channel(
f"{ip}:{port}",
options=[
("grpc.max_send_message_length", max_message_length),
("grpc.max_receive_message_length", max_message_length),
],
)


class _ConnectionInterface:
def __init__(self, create_grpc_service, error_state):
self._scheme_eval_service = create_grpc_service(SchemeEvalService, error_state)
self.scheme_eval = service_creator("scheme_eval").create(
self._scheme_eval_service
)

@property
def product_build_info(self) -> str:
"""Get Fluent build information."""
build_time = self.scheme_eval.scheme_eval("(inquire-build-time)")
build_id = self.scheme_eval.scheme_eval("(inquire-build-id)")
rev = self.scheme_eval.scheme_eval("(inquire-src-vcs-id)")
branch = self.scheme_eval.scheme_eval("(inquire-src-vcs-branch)")
return f"Build Time: {build_time} Build Id: {build_id} Revision: {rev} Branch: {branch}"

def get_cortex_connection_properties(self):
"""Get connection properties of Fluent."""
from grpc._channel import _InactiveRpcError

try:
logger.info(self.product_build_info)
logger.debug("Obtaining Cortex connection properties...")
fluent_host_pid = self.scheme_eval.scheme_eval("(cx-client-id)")
cortex_host = self.scheme_eval.scheme_eval("(cx-cortex-host)")
cortex_pid = self.scheme_eval.scheme_eval("(cx-cortex-id)")
cortex_pwd = self.scheme_eval.scheme_eval("(cortex-pwd)")
logger.debug("Cortex connection properties successfully obtained.")
except _InactiveRpcError:
logger.warning(
"Fluent Cortex properties unobtainable. 'force exit()' and other"
"methods are not going to work properly. Proceeding..."
)
fluent_host_pid = None
cortex_host = None
cortex_pid = None
cortex_pwd = None

return fluent_host_pid, cortex_host, cortex_pid, cortex_pwd

def is_solver_mode(self):
"""Checks if the Fluent session is in solver mode.
Returns
--------
``True`` if the Fluent session is in solver mode, ``False`` otherwise.
"""
return self.scheme_eval.scheme_eval("(cx-solver-mode?)")

def exit_server(self):
"""Exits the server."""
self.scheme_eval.exec(("(exit-server)",))


class FluentConnection:
"""Encapsulates a Fluent connection.
Expand All @@ -224,9 +302,8 @@ def __init__(
password: Optional[str] = None,
channel: Optional[grpc.Channel] = None,
cleanup_on_exit: bool = True,
start_transcript: bool = True,
remote_instance: Optional[Instance] = None,
launcher_args: Optional[Dict[str, Any]] = None,
slurm_job_id: Optional[str] = None,
inside_container: Optional[bool] = None,
):
"""Initialize a Session.
Expand All @@ -252,17 +329,15 @@ def __init__(
When True, the connected Fluent session will be shut down
when PyFluent is exited or exit() is called on the session
instance, by default True.
start_transcript : bool, optional
The Fluent transcript is started in the client only when
start_transcript is True. It can be started and stopped
subsequently via method calls on the Session object.
remote_instance : ansys.platform.instancemanagement.Instance
The corresponding remote instance when Fluent is launched through
PyPIM. This instance will be deleted when calling
``Session.exit()``.
slurm_job_id: bool, optional
Job ID of a Fluent session running within a Slurm environment.
inside_container: bool, optional
Whether the Fluent session that is being connected to
is running inside a docker container.
is running inside a Docker container.
Raises
------
Expand All @@ -277,22 +352,9 @@ def __init__(
if channel is not None:
self._channel = channel
else:
if not ip:
ip = os.getenv("PYFLUENT_FLUENT_IP", "127.0.0.1")
if not port:
port = os.getenv("PYFLUENT_FLUENT_PORT")
ip, port = _get_ip_and_port(ip, port)
self._channel = _get_channel(ip, port)
self._channel_str = f"{ip}:{port}"
if not port:
raise PortNotProvided()
# Same maximum message length is used in the server
max_message_length = _get_max_c_int_limit()
self._channel = grpc.insecure_channel(
f"{ip}:{port}",
options=[
("grpc.max_send_message_length", max_message_length),
("grpc.max_receive_message_length", max_message_length),
],
)
self._metadata: List[Tuple[str, str]] = (
[("password", password)] if password else []
)
Expand All @@ -304,44 +366,21 @@ def __init__(
# throws, we should not proceed.
self.health_check.check_health()

self._slurm_job_id = launcher_args and launcher_args.get("slurm_job_id")
self._slurm_job_id = slurm_job_id

self._id = f"session-{next(FluentConnection._id_iter)}"

if not FluentConnection._monitor_thread:
FluentConnection._monitor_thread = MonitorThread()
FluentConnection._monitor_thread.start()

# Move this service later.
# Currently, required by launcher to connect to a running session.
self._scheme_eval_service = self.create_grpc_service(
SchemeEvalService, self._error_state
self._connection_interface = _ConnectionInterface(
self.create_grpc_service, self._error_state
)
self.scheme_eval = service_creator("scheme_eval").create(
self._scheme_eval_service
fluent_host_pid, cortex_host, cortex_pid, cortex_pwd = (
self._connection_interface.get_cortex_connection_properties()
)

self._cleanup_on_exit = cleanup_on_exit
self.start_transcript = start_transcript
from grpc._channel import _InactiveRpcError

try:
logger.info(self.fluent_build_info)
logger.debug("Obtaining Cortex connection properties...")
fluent_host_pid = self.scheme_eval.scheme_eval("(cx-client-id)")
cortex_host = self.scheme_eval.scheme_eval("(cx-cortex-host)")
cortex_pid = self.scheme_eval.scheme_eval("(cx-cortex-id)")
cortex_pwd = self.scheme_eval.scheme_eval("(cortex-pwd)")
logger.debug("Cortex connection properties successfully obtained.")
except _InactiveRpcError:
logger.warning(
"Fluent Cortex properties unobtainable, force exit and other"
"methods are not going to work properly, proceeding..."
)
cortex_host = None
cortex_pid = None
cortex_pwd = None
fluent_host_pid = None

if (
(inside_container is None or inside_container is True)
Expand Down Expand Up @@ -371,7 +410,6 @@ def __init__(
)

self._remote_instance = remote_instance
self.launcher_args = launcher_args

self._exit_event = threading.Event()

Expand All @@ -389,22 +427,13 @@ def __init__(
FluentConnection._exit,
self._channel,
self._cleanup_on_exit,
self.scheme_eval,
self._connection_interface,
self.finalizer_cbs,
self._remote_instance,
self._exit_event,
)
FluentConnection._monitor_thread.cbs.append(self._finalizer)

@property
def fluent_build_info(self) -> str:
"""Get Fluent build info."""
build_time = self.scheme_eval.scheme_eval("(inquire-build-time)")
build_id = self.scheme_eval.scheme_eval("(inquire-build-id)")
rev = self.scheme_eval.scheme_eval("(inquire-src-vcs-id)")
branch = self.scheme_eval.scheme_eval("(inquire-src-vcs-branch)")
return f"Build Time: {build_time} Build Id: {build_id} Revision: {rev} Branch: {branch}"

def _close_slurm(self):
subprocess.run(["scancel", f"{self._slurm_job_id}"])

Expand Down Expand Up @@ -665,7 +694,7 @@ def exit(
def _exit(
channel,
cleanup_on_exit,
scheme_eval,
connection_interface,
finalizer_cbs,
remote_instance,
exit_event,
Expand All @@ -676,7 +705,7 @@ def _exit(
cb()
if cleanup_on_exit:
try:
scheme_eval.exec(("(exit-server)",))
connection_interface.exit_server()
except Exception:
pass
channel.close()
Expand Down
4 changes: 2 additions & 2 deletions src/ansys/fluent/core/launcher/container_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ def __call__(self):
port=port,
password=password,
cleanup_on_exit=self.cleanup_on_exit,
start_transcript=self.start_transcript,
launcher_args=self.argvals,
slurm_job_id=self.argvals and self.argvals.get("slurm_job_id"),
inside_container=True,
),
file_transfer_service=self.file_transfer_service,
start_transcript=self.start_transcript,
)

if self.start_watchdog is None and self.cleanup_on_exit:
Expand Down
2 changes: 1 addition & 1 deletion src/ansys/fluent/core/launcher/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ def connect_to_fluent(
port=port,
password=password,
cleanup_on_exit=cleanup_on_exit,
start_transcript=start_transcript,
)
new_session = _get_running_session_mode(fluent_connection)

Expand All @@ -374,4 +373,5 @@ def connect_to_fluent(

return new_session(
fluent_connection=fluent_connection,
start_transcript=start_transcript,
)
7 changes: 4 additions & 3 deletions src/ansys/fluent/core/launcher/pim_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ def launch_remote_fluent(
channel=channel,
cleanup_on_exit=cleanup_on_exit,
remote_instance=instance,
start_transcript=start_transcript,
launcher_args=launcher_args,
slurm_job_id=launcher_args and launcher_args.get("slurm_job_id"),
)

file_transfer_service = (
Expand All @@ -303,5 +302,7 @@ def launch_remote_fluent(
)

return session_cls(
fluent_connection=fluent_connection, file_transfer_service=file_transfer_service
fluent_connection=fluent_connection,
file_transfer_service=file_transfer_service,
start_transcript=start_transcript,
)
2 changes: 1 addition & 1 deletion src/ansys/fluent/core/launcher/pyfluent_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def _get_running_session_mode(
try:
session_mode = FluentMode.get_mode(
"solver"
if fluent_connection.scheme_eval.scheme_eval("(cx-solver-mode?)")
if fluent_connection._connection_interface.is_solver_mode()
else "meshing"
)
except Exception as ex:
Expand Down
3 changes: 1 addition & 2 deletions src/ansys/fluent/core/launcher/watchdog_exec
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ if __name__ == "__main__":
"ip": ip,
"port": int(port),
"password": password,
"launcher_args": None,
"start_transcript": False,
"slurm_job_id": None,
"cleanup_on_exit": True,
}

Expand Down
Loading

0 comments on commit 7bc7613

Please sign in to comment.