From 84bc55da0cdec3f52e54de510e95f7897c1f49b5 Mon Sep 17 00:00:00 2001 From: PanekOndrej Date: Thu, 13 Oct 2022 17:47:39 +0200 Subject: [PATCH 01/10] TcpOslListener class was added. --- src/ansys/optislang/core/optislang.py | 21 + src/ansys/optislang/core/osl_process.py | 18 +- src/ansys/optislang/core/osl_server.py | 5 + src/ansys/optislang/core/server_commands.py | 12 +- src/ansys/optislang/core/tcp_osl_server.py | 911 ++++++++++++-------- tests/test_optislang.py | 8 +- tests/test_tcp_osl_server.py | 18 +- 7 files changed, 607 insertions(+), 386 deletions(-) diff --git a/src/ansys/optislang/core/optislang.py b/src/ansys/optislang/core/optislang.py index 93041d4a5..8a2f6ca34 100644 --- a/src/ansys/optislang/core/optislang.py +++ b/src/ansys/optislang/core/optislang.py @@ -137,6 +137,13 @@ def __str__(self): f"PyOptiSLang: {version('ansys.optislang.core')}" ) + def __del__(self): + """Shutdown optiSLang.""" + if self.__host and self.__port: + self.shutdown() + else: + self.terminate_server_threads() + @property def name(self) -> str: """Instance unique identifier.""" @@ -510,3 +517,17 @@ def stop_gently(self, wait_for_finished: bool = True) -> None: Raised when the timeout float value expires. """ self.__osl_server.stop_gently(wait_for_finished) + + def terminate_server_threads(self) -> None: + """Terminate all local threads created by self.__osl_server. + + Raises + ------ + OslCommunicationError + Raised when an error occurs while communicating with server. + OslCommandError + Raised when the command or query fails. + TimeoutError + Raised when the timeout float value expires. + """ + self.__osl_server.terminate_server_threads() diff --git a/src/ansys/optislang/core/osl_process.py b/src/ansys/optislang/core/osl_process.py index 8ea017abf..3aa2b2c6a 100644 --- a/src/ansys/optislang/core/osl_process.py +++ b/src/ansys/optislang/core/osl_process.py @@ -35,6 +35,7 @@ class ServerNotification(Enum): ACTOR_CONTENTS_CHANGED = 14 ACTOR_DATA_CHANGED = 15 NUM_NOTIFICATIONS = 16 + ALL = 17 class OslServerProcess: @@ -74,7 +75,7 @@ class OslServerProcess: notifications : Iterable[ServerNotification], optional Notifications to be sent to the listener. Defaults to ``None``. shutdown_on_finished: bool, optional - Shut down when execution is finished. Defaults to ``True``. + Shut down when execution is finished. Defaults to ``False``. env_vars : Mapping[str, str], optional Additional environmental variables (key and value) for the optiSLang server process. @@ -618,21 +619,16 @@ def __terminate_osl_child_processes(self): process.terminate() except psutil.NoSuchProcess: self._logger.debug( - "Cannot terminate child process PID: %s. " "The process does not exist.", - process.pid, + f"Cannot terminate child process PID: {process.pid}. " + "The process does not exist." ) gone, alive = psutil.wait_procs(children, timeout=3) - for process in gone: - self._logger.debug( - "optiSLang server child process %s terminated with exit code %s.", - process, - process.returncode, - ) + for process in alive: self._logger.debug( - "optiSLang server child process %s could not be terminated and will be killed.", - process, + f"optiSLang server child process {process} could not be terminated " + "and will be killed.", ) process.kill() diff --git a/src/ansys/optislang/core/osl_server.py b/src/ansys/optislang/core/osl_server.py index 39f94c664..797cea214 100644 --- a/src/ansys/optislang/core/osl_server.py +++ b/src/ansys/optislang/core/osl_server.py @@ -484,3 +484,8 @@ def stop_gently(self, wait_for_finished: bool = True) -> None: Raised when the timeout float value expires. """ pass + + @abstractmethod + def terminate_server_threads(self) -> None: + """Terminate all local threads created by this instance.""" + pass diff --git a/src/ansys/optislang/core/server_commands.py b/src/ansys/optislang/core/server_commands.py index a6323f36a..e738fe428 100644 --- a/src/ansys/optislang/core/server_commands.py +++ b/src/ansys/optislang/core/server_commands.py @@ -1,6 +1,6 @@ """Module for generation of all server commands.""" import json -from typing import Dict, Sequence, Union +from typing import Dict, Iterable, Sequence, Union _APPLY_WIZARD = "APPLY_WIZARD" _CLOSE = "CLOSE" @@ -599,7 +599,7 @@ def register_listener( host: str = None, port: int = None, timeout: int = None, - notifications: Sequence = None, + notifications: Iterable[str] = None, password: str = None, ) -> str: """Generate JSON string of register_listener command. @@ -1391,10 +1391,10 @@ def subscribe_for_push_notifications( raise TypeError( f"Unsuppored values of ``notifications``: {invalid_items}, " "supported options are: \n" - "server: {server},\n" - "logging: {logging},\n" - "project: {project},\n" - "nodes: {nodes}" + f"server: {server},\n" + f"logging: {logging},\n" + f"project: {project},\n" + f"nodes: {nodes}" ) args["notifications"] = notifications diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index 74f0243a2..c56410f36 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -1,5 +1,4 @@ """Contains classes for plain TCP/IP communication with server.""" - from datetime import datetime import json import logging @@ -10,7 +9,7 @@ import struct import threading import time -from typing import Dict, Iterable, Sequence, Tuple, Union +from typing import Callable, Dict, List, Sequence, Tuple, Union import uuid from ansys.optislang.core import server_commands as commands @@ -24,7 +23,7 @@ OslCommunicationError, ResponseFormatError, ) -from ansys.optislang.core.osl_process import OslServerProcess +from ansys.optislang.core.osl_process import OslServerProcess, ServerNotification from ansys.optislang.core.osl_server import OslServer @@ -518,6 +517,267 @@ def _fetch_file(self, file_len: int, file_path: str, timeout: Union[float, None] data_len += len(chunk) +class TcpOslListener: + """Listener of optiSLang server. + + Parameters + ---------- + port_range: Tuple + Range of ports for listener. + timeout: float + Timeout in seconds to receive a message. Timeout exception will be raised + if the timeout period value has elapsed before the operation has completed. If ``None`` + is given, the blocking mode is used. + name: str + Name of listener. + host: str + Local IPv6 address. + uid: str, opt + Unique ID of listener, should be used only if listener is used for optiSLangs port + when started locally. + logger: OslLogger, opt + Preferably OslLogger should be given. If not given, default logging.Logger is used. + + Raises + ------ + ValueError + Raised when port_range != 2 or first number is higher. + TypeError + Raised when port_range not type Tuple[int, int] + TimeoutError + Raised when the timeout float value expires. + + Examples + -------- + Create listener + >>> from ansys.optislang.core.tcp_osl_server import TcpOslListener + >>> general_listener = TcpOslListener( + >>> port_range = self.__class__._PRIVATE_PORTS_RANGE, + >>> timeout = 30, + >>> name = 'GeneralListener', + >>> host = '127.0.0.1', + >>> uid = str(uuid.uuid4()), + >>> logger = logging.getLogger(__name__), + >>> ) + """ + + def __init__( + self, + port_range: Tuple, + timeout: float, + name: str, + host: str = None, + uid: str = None, + logger=None, + ): + """Initialize a new instance of the ``TcpOslListener`` class.""" + self.__uid = uid + self.__name = name + self.__timeout = timeout + self.__listener_socket = None + self.__thread = None + self.__callbacks = [] + self.__run_listening_thread = False + + if logger is None: + self._logger = logging.getLogger(__name__) + else: + self._logger = logger + + if len(port_range) != 2: + raise ValueError(f"Port ranges length must be 2 but: len = {len(port_range)}") + if isinstance(port_range, (int, int)): + raise TypeError( + "Port range not type Tuple[int, int] but:" + f"[{type(port_range[0])}, {port_range[1]}]." + ) + if port_range[0] > port_range[1]: + raise ValueError("First number is higher.") + + self.__init_listener_socket(host=host, port_range=port_range) + + def is_initialized(self) -> bool: + """Return True if listener was initialized.""" + return self.__listener_socket is not None + + def dispose(self) -> None: + """Delete listeners socket if exists.""" + if self.__listener_socket is not None: + self.__listener_socket.close() + + @property + def uid(self): + """Instance unique identifier.""" + return self.__uid + + @uid.setter + def uid(self, uid): + self.__uid = uid + + @property + def name(self): + """Instance name used for naming self.__thread.""" + return self.__name + + @property + def timeout(self): + """Timeout in seconds to receive a message.""" + return self.__timeout + + @timeout.setter + def timeout(self, timeout): + self.__timeout = timeout + + @property + def host(self): + """Local IPv6 address associated with self.__listener_socket.""" + return self.__listener_socket.getsockname()[0] + + @property + def port(self): + """Port number associated with self.__listener_socket.""" + return self.__listener_socket.getsockname()[1] + + def add_callback(self, callback: Callable, args): + """Add callback (method) that will be called after push notification is received.""" + self.__callbacks.append((callback, args)) + + # def remove_callback(self, callback: Callable): + # self.__callbacks.remove(callback) + + def clear_callbacks(self): + """Remove all callbacks.""" + self.__callbacks.clear() + + def start_listening(self, timeout=None): + """Start new thread listening optiSLang server port. + + Parameters + ---------- + timeout: float, opt + Listener socket timeout. + """ + self.__thread = threading.Thread( + target=self.__listen, + name=f"PyOptiSLang.TcpOslListener.{self.name}", + args=(timeout,), + daemon=True, + ) + self.__run_listening_thread = True + self.__thread.start() + + def stop_listening(self): + """Stop listening optiSLang server port.""" + self.__run_listening_thread = False + self.__thread = None + + def __init_listener_socket(self, host: str, port_range: Tuple[int, int]) -> socket.socket: + """Initialize listener. + + Parameters + ---------- + host: str + A string representation of an IPv4/v6 address or domain name. + port_range : Tuple[int, int] + Defines the port range for port listener. Defaults to ``None``. + + Returns + ------- + socket + Listener socket. + """ + self.__listener_socket = None + for port in range(port_range[0], port_range[1] + 1): + try: + self.__listener_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.__listener_socket.bind((host, port)) + self.__listener_socket.listen(5) + self._logger.debug("Listening on port: %d", port) + break + except IOError as ex: + if self.__listener_socket is not None: + self.__listener_socket.close() + self.__listener_socket = None + + def __listen(self, timeout=None) -> None: + """Listen to the optiSLang server. + + Parameters + ---------- + timeout: float, opt + Listener socket timeout. + """ + start_time = time.time() + if timeout is None: + timeout = self.__timeout + + while self.__run_listening_thread: + client = None + try: + self.__listener_socket.settimeout(_get_current_timeout(timeout, start_time)) + clientsocket, address = self.__listener_socket.accept() + self._logger.debug("Connection from %s has been established.", address) + + client = TcpClient(clientsocket) + message = client.receive_msg(timeout) + self._logger.debug("Received message from client: %s", message) + + response = json.loads(message) + client.send_msg("") + self.__execute_callbacks(response) + + except TimeoutError or socket.timeout: + self._logger.warning(f"Listener {self.uid} listening timed out.") + self.stop_listening() + break + except Exception as ex: + self._logger.warning(ex) + finally: + if client is not None: + client.disconnect() + + def __execute_callbacks(self, response): + """Execute all callback.""" + for callback, args in self.__callbacks: + callback(self, response, *args) + + def is_listening(self): + """Return True if listener is listening.""" + return self.is_initialized() and self.__thread is not None and self.__thread.is_alive() + + def join(self): + """Wait until self.__thread is finished.""" + if not self.is_listening(): + raise RuntimeError("Listener is not listening.") + self.__thread.join() + + def cleanup_notifications(self, timeout: float = 0.2): + """Cleanup previously unprocessed push notifications. + + Parameters + ---------- + timeout: float, opt + Listener socket timeout. Default value ``0.2``. + """ + while True: + client = None + try: + self.__listener_socket.settimeout(timeout) + clientsocket, address = self.__listener_socket.accept() + client = TcpClient(clientsocket) + message = client.receive_msg(timeout) + data_dict = json.loads(message) + self._logger.debug(f"CLEANUP: {data_dict}") + client.send_msg("") + except socket.timeout: + break + except Exception as ex: + self._logger.warning(ex) + finally: + if client is not None: + client.disconnect() + + class TcpOslServer(OslServer): """Class which provides access to optiSLang server using plain TCP/IP communication protocol. @@ -611,13 +871,26 @@ def __init__( self.__no_save = no_save self.__password = password self.__osl_process = None - self.__refresh = True - self.__refresh_interval = 10 + self.__listeners = {} + self.__listeners_registration_thread = None + self.__refresh_listeners = threading.Event() + self.__listeners_refresh_interval = 20 if self.__host is None or self.__port is None: + self.__host = self.__class__._LOCALHOST self._start_local(ini_timeout) else: - self._connect_to_remote() + listener = self.__create_listener( + timeout=self.__timeout, + name="Main", + ) + listener.uid = self.__register_listener( + host=listener.host, + port=listener.port, + notifications=[ServerNotification.ALL], + ) + self.__listeners["main"] = listener + self.__start_listeners_registration_thread() def _get_server_info(self) -> Dict: """Get information about the application, the server configuration and the open projects. @@ -1026,9 +1299,9 @@ def set_timeout(self, timeout: Union[float, None] = None) -> None: timeout: Union[float, None] Timeout in seconds to perform commands, it must be greater than zero or ``None``. Another functions will raise a timeout exception if the timeout period value has - elapsed before the operation has completed. If ``None`` is given, functions - will wait until they're finished (no timeout exception is raised). - Defaults to ``None``. + elapsed before the operation has completed. + If ``None`` is given, functions will wait until they're finished (no timeout + exception is raised). Defaults to ``None``. Raises ------ @@ -1057,6 +1330,9 @@ def set_timeout(self, timeout: Union[float, None] = None) -> None: f"``None`` but {type(timeout)} was given." ) + for listener in self.__listeners.values(): + listener.timeout = timeout + def shutdown(self, force: bool = False) -> None: """Shutdown the server. @@ -1083,21 +1359,21 @@ def shutdown(self, force: bool = False) -> None: TimeoutError Raised when the parameter force is ``False`` and the timeout float value expires. """ - # Don't refresh listener registration - self.__refresh = False - self.__refresh_thread.join() - - # Unregister listener - try: - self._unregister_listener(str(self.__listener_uid)) - except Exception as ex: - self._logger.warn("Cannot unregister port listener: %s", ex) - - if self.__listener_socket is not None: - self.__listener_socket.close() + self.__stop_listeners_registration_thread() + # Unregister listeners and close its sockets + for listener in self.__listeners.values(): + if listener.uid is not None: + try: + self._unregister_listener(listener) + except Exception as ex: + self._logger.warn("Cannot unregister port listener: %s", ex) + if listener.is_listening(): + listener.dispose() # Only in case shutdown_on_finished option is not set, actively send shutdown command - if not self.__osl_process.shutdown_on_finished: + if self.__osl_process is None or ( + self.__osl_process is not None and not self.__osl_process.shutdown_on_finished + ): try: self._send_command(commands.shutdown(self.__password)) except Exception: @@ -1154,47 +1430,64 @@ def start(self, wait_for_started: bool = True, wait_for_finished: bool = True) - TimeoutError Raised when the timeout float value expires. """ - self.__cleanup_notifications(self.__listener_socket) - successfully_started = False - successfully_started_queue = Queue() already_running = False if self.get_project_status() == "PROCESSING": already_running = True + self._logger.debug("Status PROCESSING") if not already_running and (wait_for_started or wait_for_finished): - start_listener_thread = threading.Thread( - target=self.__wait_for_started, - name="PyOptiSLang.OslPortListener", - args=(self.__listener_socket, self.__timeout, successfully_started_queue), - daemon=True, + exec_started_listener = self.__listeners.get("exec_started_listener", None) + if exec_started_listener is None: + exec_started_listener = self.__create_exec_started_listener() + exec_started_listener.cleanup_notifications() + wait_for_started_queue = Queue() + exec_started_listener.add_callback( + self.__class__.__terminate_listener_thread, + ( + [ServerNotification.EXECUTION_STARTED.name], + wait_for_started_queue, + self._logger, + ), ) - start_listener_thread.start() - self._logger.debug("Thread for listening push notifications created.") + exec_started_listener.start_listening() + self._logger.debug("Wait for started thread was created.") if wait_for_finished: - stop_listener_thread = threading.Thread( - target=self.__wait_for_finish, - name="PyOptiSLang.OslPortListener", - args=(self.__listener_socket, self.__timeout), - daemon=True, + exec_finished_listener = self.__listeners.get("exec_finished_listener", None) + if exec_finished_listener is None: + exec_finished_listener = self.__create_exec_finished_listener() + exec_finished_listener.cleanup_notifications() + wait_for_finished_queue = Queue() + exec_finished_listener.add_callback( + self.__class__.__terminate_listener_thread, + ( + [ + ServerNotification.EXECUTION_FINISHED.name, + ServerNotification.NOTHING_PROCESSED.name, + ], + wait_for_finished_queue, + self._logger, + ), ) - stop_listener_thread.start() - self._logger.debug("Thread for listening push notifications created.") + exec_finished_listener.start_listening() + self._logger.debug("Wait for finished thread was created.") if not already_running: self._send_command(commands.start(self.__password)) if not already_running and (wait_for_started or wait_for_finished): self._logger.info(f"Waiting for started") - start_listener_thread.join() - successfully_started = successfully_started_queue.get() - self._logger.info(f"successfully_started: {successfully_started}") + successfully_started = wait_for_started_queue.get() + self._logger.info(f"Successfully_started: {successfully_started}.") if wait_for_finished and (successfully_started or already_running): self._logger.info(f"Waiting for finished") - stop_listener_thread.join() + successfully_finished = wait_for_finished_queue.get() + self._logger.info(f"Successfully_finished: {successfully_finished}.") + else: + time.sleep(1) def stop(self, wait_for_finished: bool = True) -> None: """Stop project execution. @@ -1216,35 +1509,36 @@ def stop(self, wait_for_finished: bool = True) -> None: TimeoutError Raised when the timeout float value expires. """ - self.__cleanup_notifications(self.__listener_socket) - status = self.get_project_status() - stopped_states = [ - "IDLE", - "FINISHED", - "STOP_REQUESTED", - "STOPPED", - "ABORT_REQUESTED", - "ABORTED", - ] - - if status not in stopped_states: + if not self._is_status_in_stopped_states(status): if wait_for_finished: - listener_thread = threading.Thread( - target=self.__wait_for_finish, - name="PyOptiSLang.OslPortListener", - args=(self.__listener_socket, self.__timeout), - daemon=True, + exec_finished_listener = self.__listeners.get("exec_finished_listener", None) + if exec_finished_listener is None: + exec_finished_listener = self.__create_exec_finished_listener() + exec_finished_listener.cleanup_notifications() + wait_for_finished_queue = Queue() + exec_finished_listener.add_callback( + self.__class__.__terminate_listener_thread, + ( + [ + ServerNotification.EXECUTION_FINISHED.name, + ServerNotification.NOTHING_PROCESSED.name, + ], + wait_for_finished_queue, + self._logger, + ), ) - listener_thread.start() - self._logger.debug("Thread for listening push notifications created.") + exec_finished_listener.start_listening() + self._logger.debug("Wait for finished thread was created.") self._send_command(commands.stop(self.__password)) if wait_for_finished: self._logger.info(f"Waiting for finished") - listener_thread.join() + # exec_finished_listener.join() + successfully_finished = wait_for_finished_queue.get() + self._logger.info(f"Successfully_finished: {successfully_finished}.") else: self._logger.debug(f"Do not send STOP request, project status is: {status}") @@ -1268,44 +1562,71 @@ def stop_gently(self, wait_for_finished: bool = True) -> None: TimeoutError Raised when the timeout float value expires. """ - self.__cleanup_notifications(self.__listener_socket) - status = self.get_project_status() - stopped_states = [ - "IDLE", - "FINISHED", - "STOP_REQUESTED", - "STOPPED", - "ABORT_REQUESTED", - "ABORTED", - ] - if status not in stopped_states: + if not self._is_status_in_stopped_states(status): if wait_for_finished: - listener_thread = threading.Thread( - target=self.__wait_for_finish, - name="PyOptiSLang.OslPortListener", - args=(self.__listener_socket, self.__timeout), - daemon=True, + exec_finished_listener = self.__listeners.get("exec_finished_listener", None) + if exec_finished_listener is None: + exec_finished_listener = self.__create_exec_finished_listener() + exec_finished_listener.cleanup_notifications() + wait_for_finished_queue = Queue() + exec_finished_listener.add_callback( + self.__class__.__terminate_listener_thread, + ( + [ + ServerNotification.EXECUTION_FINISHED.name, + ServerNotification.NOTHING_PROCESSED.name, + ], + wait_for_finished_queue, + self._logger, + ), ) - listener_thread.start() - self._logger.debug("Thread for listening push notifications created.") + exec_finished_listener.start_listening() + self._logger.debug("Wait for finished thread was created.") self._send_command(commands.stop_gently(self.__password)) if wait_for_finished: self._logger.info(f"Waiting for finished") - listener_thread.join() + # exec_finished_listener.join() + successfully_finished = wait_for_finished_queue.get() + self._logger.info(f"Successfully_finished: {successfully_finished}.") else: self._logger.debug(f"Do not send STOP_GENTLY request, project status is: {status}") - def _unregister_listener(self, uuid: str) -> None: + def terminate_server_threads(self) -> None: + """Terminate all local threads created by this instance.""" + self.__stop_listeners_registration_thread() + # Unregister listeners and close its sockets + for listener in self.__listeners.values(): + if listener.uid is not None: + try: + self._unregister_listener(listener) + except Exception as ex: + self._logger.warn("Cannot unregister port listener: %s", ex) + if listener.is_listening(): + listener.dispose() + + def _is_status_in_stopped_states(self, status: str) -> bool: + """Compare current project status with list.""" + stopped_states = [ + "IDLE", + "FINISHED", + "STOP_REQUESTED", + "STOPPED", + "ABORT_REQUESTED", + "ABORTED", + ] + return status in stopped_states + + def _unregister_listener(self, listener: TcpOslListener) -> None: """Unregister a listener. Parameters ---------- - uuid : str - Specific unique ID for the TCP listener. + listener : TcpOslListener + Class with listener properties. Raises ------ @@ -1316,9 +1637,10 @@ def _unregister_listener(self, uuid: str) -> None: TimeoutError Raised when the timeout float value expires. """ - self._send_command(commands.unregister_listener(uuid, self.__password)) + self._send_command(commands.unregister_listener(str(listener.uid), self.__password)) + listener.uid = None - def _start_local(self, ini_timeout: float) -> None: + def _start_local(self, ini_timeout: float) -> TcpOslListener: """Start local optiSLang server. Parameters @@ -1339,41 +1661,34 @@ def _start_local(self, ini_timeout: float) -> None: if self.__osl_process is not None: raise RuntimeError("optiSLang server is already started.") - self.__listener_socket = None - self.__listener_uid = str(uuid.uuid4()) + listener = self.__create_listener( + uid=str(uuid.uuid4()), timeout=self.__timeout, name="Main" + ) + port_queue = Queue() + listener.add_callback(self.__class__.__port_on_listended, (port_queue, self._logger)) try: - self.__listener_socket = self.__init_listener( - self.__class__._LOCALHOST, - self.__class__._PRIVATE_PORTS_RANGE, - ) - if self.__listener_socket is None: - raise RuntimeError("Cannot start listener of optiSLang server port.") - - listener_thread = threading.Thread( - target=self.__listen_port, - name="PyOptiSLang.OslPortListener", - args=(self.__listener_socket, ini_timeout), - daemon=True, - ) - listener_thread.start() + listener.start_listening(timeout=ini_timeout) self.__osl_process = OslServerProcess( - self.__executable, + executable=self.__executable, project_path=self.__project_path, no_save=self.__no_save, password=self.__password, - listener=(self.__class__._LOCALHOST, self.__listener_socket.getsockname()[1]), - listener_id=self.__listener_uid, + listener=(listener.host, listener.port), + listener_id=listener.uid, + notifications=[ServerNotification.ALL], + shutdown_on_finished=True, logger=self._logger, ) self.__osl_process.start() - listener_thread.join() + listener.join() + if not port_queue.empty(): + self.__port = port_queue.get() except Exception: - if self.__listener_socket is not None: - self.__listener_socket.close() + listener.dispose() raise finally: @@ -1382,24 +1697,22 @@ def _start_local(self, ini_timeout: float) -> None: self.__osl_process = None raise RuntimeError("Cannot get optiSLang server port.") - self.__host = self.__class__._LOCALHOST + self.__listeners["main_listener"] = listener + self.__start_listeners_registration_thread() - # subscribe listener for push notifications and create new thread - # for refresing listener registration - self.__subscribe_for_push_notifications( - uid=self.__listener_uid, - notifications=[ - "EXECUTION_STARTED", - "EXECUTION_FINISHED", - "NOTHING_PROCESSED", - "CHECK_FAILED", - "EXEC_FAILED", - ], - ) - self.__create_refresh_thread() + def __create_listener(self, timeout: float, name: str, uid: str = None) -> TcpOslListener: + """Create new listener. - def _connect_to_remote(self) -> None: - """Connect to remote optiSLang server. + Parameters + ---------- + timeout: float + Timeout. + Uid: str + Listener uid. + + Returns + ------- + TcpOslListener Raises ------ @@ -1410,152 +1723,94 @@ def _connect_to_remote(self) -> None: -or- optiSLang server port is not listened for specified timeout value. """ - self.__listener_socket = None - - self.__listener_socket = self.__init_listener( - socket.gethostbyname(socket.gethostname()), - self.__class__._PRIVATE_PORTS_RANGE, + listener = TcpOslListener( + port_range=self.__class__._PRIVATE_PORTS_RANGE, + timeout=timeout, + name=name, + host=self.__host, + uid=uid, + logger=self._logger, ) - if self.__listener_socket is None: + + if not listener.is_initialized(): raise RuntimeError("Cannot start listener of optiSLang server port.") - # TODO: another way of getting local_ip, hostname doesn't have to be defined? - self.__listener_uid = self.__register_listener( - host=socket.gethostbyname(socket.gethostname()), - port=self.__listener_socket.getsockname()[1], + return listener + + def __create_exec_started_listener(self) -> TcpOslListener: + """Create exec_started listener and add to self.__listeners.""" + exec_started_listener = self.__create_listener( + timeout=self.__timeout, + name="ExecStarted", + ) + exec_started_listener.uid = self.__register_listener( + host=exec_started_listener.host, + port=exec_started_listener.port, notifications=[ - "EXECUTION_STARTED", - "EXECUTION_FINISHED", - "NOTHING_PROCESSED", - "CHECK_FAILED", - "EXEC_FAILED", + ServerNotification.EXECUTION_STARTED, + ServerNotification.EXEC_FAILED, + ServerNotification.CHECK_FAILED, ], ) - - self.__create_refresh_thread() - - def __init_listener(self, host: str, port_range: Tuple[int, int]) -> socket.socket: - """Initialize listener. - - Parameters - ---------- - host: str - A string representation of an IPv4/v6 address or domain name. - port_range : Tuple[int, int] - Defines the port range for port listener. Defaults to ``None``. - - Returns - ------- - socket - Listener socket. - """ - listener_socket = None - for port in range(port_range[0], port_range[1] + 1): - try: - listener_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - listener_socket.bind((host, port)) - listener_socket.listen(5) - self._logger.debug("Listening on port: %d", port) - break - except IOError as ex: - if listener_socket is not None: - listener_socket.close() - listener_socket = None - - return listener_socket - - def __listen_port(self, listener_socket: socket.socket, timeout: float) -> None: - """Listen to the optiSLang server port. - - Parameters - ---------- - listener_socket : socket.socket - Socket of the port listener. - timeout : float - Timeout in seconds for listening port. - """ - listener_socket.settimeout(timeout) - start_time = datetime.now() - while True: - if (datetime.now() - start_time).seconds > timeout: - self._logger.info("OptiSLang server port listening timed out.") - break - - client = None - try: - clientsocket, address = listener_socket.accept() - self._logger.debug("Connection from %s has been established.", address) - - client = TcpClient(clientsocket) - message = client.receive_msg() - self._logger.debug("Received message from client: %s", message) - - data_dict = json.loads(message) - self.__port = int(data_dict["port"]) - self._logger.info("optiSLang server port has been received: %d", self.__port) - - client.send_msg("") - break - except Exception as ex: - self._logger.warning(ex) - finally: - if client is not None: - client.disconnect() - - def __create_refresh_thread(self) -> None: - """Create new thread for refreshing of listener registration.""" - self.__refresh_thread = threading.Thread( - target=self.__refresh_listener_registration, args=(), daemon=True + self.__listeners["exec_started_listener"] = exec_started_listener + return exec_started_listener + + def __create_exec_finished_listener(self) -> TcpOslListener: + """Create exec_finished listener and add to self.__listeners.""" + exec_finished_listener = self.__create_listener( + timeout=self.__timeout, + name="ExecFinished", ) - self.__refresh_thread.start() - - def __subscribe_for_push_notifications( - self, - uid: str, - notifications: Iterable[str], - ) -> None: - """Subscribe listener for push notifications. - - Parameters - ---------- - uid : str - Specific unique ID for the TCP listener. - notifications: Iterable[str], optional - Either ["ALL"] or Sequence picked from below options: - Server: [ "SERVER_UP", "SERVER_DOWN" ] (always be sent by default). - Logging: [ "LOG_INFO", "LOG_WARNING", "LOG_ERROR", "LOG_DEBUG" ]. - Project: [ "EXECUTION_STARTED", "EXECUTION_FINISHED", "NOTHING_PROCESSED", - "CHECK_FAILED", "EXEC_FAILED" ]. - Nodes: [ "ACTOR_STATE_CHANGED", "ACTOR_ACTIVE_CHANGED", "ACTOR_NAME_CHANGED", - "ACTOR_CONTENTS_CHANGED", "ACTOR_DATA_CHANGED" ]. - - Raises - ------ - OslCommunicationError - Raised when an error occurs while communicating with server. - OslCommandError - Raised when the command or query fails. - TimeoutError - Raised when the timeout float value expires. - """ - self._send_command( - commands.subscribe_for_push_notifications( - uid=uid, - notifications=notifications, - password=self.__password, - ) + exec_finished_listener.uid = self.__register_listener( + host=exec_finished_listener.host, + port=exec_finished_listener.port, + notifications=[ + ServerNotification.EXECUTION_FINISHED, + ServerNotification.NOTHING_PROCESSED, + ServerNotification.EXEC_FAILED, + ServerNotification.CHECK_FAILED, + ], + ) + self.__listeners["exec_finished_listener"] = exec_finished_listener + return exec_finished_listener + + def __start_listeners_registration_thread(self) -> None: + """Create new thread for refreshing of listeners registrations.""" + self.__listeners_registration_thread = threading.Thread( + target=self.__refresh_listeners_registration, + name="PyOptiSLang.ListenersRegistrationThread", + args=(), + daemon=True, ) + self.__refresh_listeners.set() + self.__listeners_registration_thread.start() + + def __stop_listeners_registration_thread(self) -> None: + """Stop listeners registration thread.""" + if self.__listeners_registration_thread and self.__listeners_registration_thread.is_alive(): + self.__refresh_listeners.clear() + self.__listeners_registration_thread.join() + self._logger.debug("Listener registration thread stopped.") - def __register_listener(self, host: str, port: int, notifications: Iterable[str] = None) -> str: + def __register_listener( + self, + host: str, + port: int, + timeout: int = 60000, + notifications: List[ServerNotification] = None, + ) -> str: """Register a client, returning a reference ID. Parameters ---------- host : str - A string representation of an IPv4/v6 address or domain name. + A string representation of an IPv4/v6 address or domain name. port: int A numeric port number of listener. - notifications: Iterable[str], optional + timeout: float + Listener will remain active for ``timeout`` ms unless refreshed. + + notifications: Iterable[ServerNotification], optional Either ["ALL"] or Sequence picked from below options: Server: [ "SERVER_UP", "SERVER_DOWN" ] (always be sent by default). Logging: [ "LOG_INFO", "LOG_WARNING", "LOG_ERROR", "LOG_DEBUG" ]. @@ -1577,14 +1832,15 @@ def __register_listener(self, host: str, port: int, notifications: Iterable[str] commands.register_listener( host=host, port=port, - notifications=notifications, + timeout=timeout, + notifications=[ntf.name for ntf in notifications], password=self.__password, ) ) return msg[0]["uid"] - def __refresh_listener_registration(self): - """Refresh listener registration. + def __refresh_listeners_registration(self) -> None: + """Refresh listeners registration. Raises ------ @@ -1599,13 +1855,14 @@ def __refresh_listener_registration(self): """ check_for_refresh = 0.5 counter = 0 - while self.__refresh: - if counter >= self.__refresh_interval: - response = self._send_command( - commands.refresh_listener_registration( - uid=self.__listener_uid, password=self.__password + while self.__refresh_listeners.is_set(): + if counter >= self.__listeners_refresh_interval: + for listener in self.__listeners.values(): + response = self._send_command( + commands.refresh_listener_registration( + uid=listener.uid, password=self.__password + ) ) - ) counter = 0 counter += check_for_refresh time.sleep(check_for_refresh) @@ -1694,97 +1951,39 @@ def __check_command_response(response: Dict) -> None: message = "Command error: " + str(response) raise OslCommandError(message) - def __wait_for_started( - self, listener_socket: socket.socket, timeout: float, successfully_started: Queue + @staticmethod + def __port_on_listended( + sender: TcpOslListener, response: dict, port_queue: Queue, logger ) -> None: - """Listen to the optiSLang server port. - - Parameters - ---------- - listener_socket : socket.socket - Socket of the port listener. - timeout : float - Timeout in seconds for listening port. - """ - listener_socket.settimeout(timeout) - while True: - client = None - try: - clientsocket, address = listener_socket.accept() - client = TcpClient(clientsocket) - message = client.receive_msg(timeout) - data_dict = json.loads(message) - self._logger.debug(f"PUSH NOTIFICATION: {data_dict}") - client.send_msg("") - if data_dict["type"] == "EXECUTION_STARTED": - successfully_started.put(True) - break - elif data_dict["type"] in [ - "CHECK_FAILED", - "EXEC_FAILED", - ]: - successfully_started.put(False) - break - except Exception as ex: - successfully_started.put(False) - self._logger.warning(ex) - finally: - if client is not None: - client.disconnect() - - def __wait_for_finish(self, listener_socket: socket.socket, timeout: float) -> None: - """Listen to the optiSLang server port. - - Parameters - ---------- - listener_socket : socket.socket - Socket of the port listener. - timeout : float - Timeout in seconds for listening port. - """ - listener_socket.settimeout(timeout) - while True: - client = None - try: - clientsocket, address = listener_socket.accept() - client = TcpClient(clientsocket) - message = client.receive_msg(timeout) - data_dict = json.loads(message) - self._logger.debug(f"PUSH NOTIFICATION: {data_dict}") - client.send_msg("") - if data_dict["type"] in [ - "NOTHING_PROCESSED", - "EXECUTION_FINISHED", - ]: - break - except Exception as ex: - self._logger.warning(ex) - finally: - if client is not None: - client.disconnect() - - def __cleanup_notifications(self, listener_socket: socket.socket) -> None: - """Cleanup previously unprocessed push notifications. + """Listen to the optiSLang server port.""" + try: + if "port" in response: + port = int(response["port"]) + port_queue.put(port) + sender.stop_listening() + sender.clear_callbacks() + except: + logger.debug("Port cannot be received from response: %s", str(response)) - Parameters - ---------- - listener_socket : socket.socket - Socket of the port listener. - """ - listener_socket.settimeout(0.2) - while True: - client = None - try: - clientsocket, address = listener_socket.accept() - client = TcpClient(clientsocket) - message = client.receive_msg(0.2) - data_dict = json.loads(message) - self._logger.debug(f"PUSH NOTIFICATION (cleanup): {data_dict}") - client.send_msg("") - except socket.timeout: - break - except Exception as ex: - self._logger.warning(ex) - finally: - if client is not None: - client.disconnect() + @staticmethod + def __terminate_listener_thread( + sender: TcpOslListener, + response: dict, + target_notifications: List[str], + target_queue: Queue, + logger, + ) -> None: + """Terminate listener thread if execution finished or failed.""" + type = response.get("type", None) + if type in [ServerNotification.EXEC_FAILED.name or ServerNotification.CHECK_FAILED.name]: + sender.stop_listening() + sender.clear_callbacks() + target_queue.put("False") + logger.error(f"Listener {sender.name} received error notification.") + elif type in target_notifications: + sender.stop_listening() + sender.clear_callbacks() + target_queue.put("True") + logger.debug(f"Listener {sender.name} received expected notification.") + elif type is None: + logger.error("Invalid response from server, push notification not evaluated.") diff --git a/tests/test_optislang.py b/tests/test_optislang.py index 1c6009556..8da5634bc 100644 --- a/tests/test_optislang.py +++ b/tests/test_optislang.py @@ -146,13 +146,12 @@ def test_stop(optislang): with does_not_raise() as dnr: optislang.run_python_script( r""" -import time from py_os_design import * sens = actors.SensitivityActor("Sensitivity") add_actor(sens) python = actors.PythonActor('python_sleep') sens.add_actor(python) -python.source = 'time.sleep(1)\noutput_value = input_value*2' +python.source = 'import time\ntime.sleep(0.1)\noutput_value = input_value*2' python.add_parameter("input_value", PyOSDesignEntry(5.0)) python.add_response(("output_value", PyOSDesignEntry(10))) connect(sens, "IODesign", python, "IDesign") @@ -160,7 +159,6 @@ def test_stop(optislang): """ ) optislang.start(wait_for_finished=False) - print(optislang.get_project_status()) optislang.stop() optislang.start() optislang.stop() @@ -175,13 +173,12 @@ def test_stop_gently(optislang): with does_not_raise() as dnr: optislang.run_python_script( r""" -import time from py_os_design import * sens = actors.SensitivityActor("Sensitivity") add_actor(sens) python = actors.PythonActor('python_sleep') sens.add_actor(python) -python.source = 'time.sleep(1)\noutput_value = input_value*2' +python.source = 'import time\ntime.sleep(0.1)\noutput_value = input_value*2' python.add_parameter("input_value", PyOSDesignEntry(5.0)) python.add_response(("output_value", PyOSDesignEntry(10))) connect(sens, "IODesign", python, "IDesign") @@ -189,7 +186,6 @@ def test_stop_gently(optislang): """ ) optislang.start(wait_for_finished=False) - print(optislang.get_project_status()) optislang.stop_gently() optislang.start() optislang.stop() diff --git a/tests/test_tcp_osl_server.py b/tests/test_tcp_osl_server.py index 84cce2d72..ab27e0ec0 100644 --- a/tests/test_tcp_osl_server.py +++ b/tests/test_tcp_osl_server.py @@ -1,5 +1,6 @@ from contextlib import nullcontext as does_not_raise import os +import socket import time import pytest @@ -7,8 +8,10 @@ from ansys.optislang.core import OslServerProcess import ansys.optislang.core.tcp_osl_server as tos -_host = "127.0.0.1" +_host = socket.gethostbyname(socket.gethostname()) _port = 5310 + + _msg = '{ "What": "SYSTEMS_STATUS_INFO" }' pytestmark = pytest.mark.local_osl @@ -16,10 +19,11 @@ @pytest.fixture(scope="function", autouse=True) def osl_server_process(): - # Will be executed before the first test - osl_server_process = OslServerProcess() + time.sleep(2) + # Will be executed before each test + osl_server_process = OslServerProcess(shutdown_on_finished=False) osl_server_process.start() - time.sleep(3) + time.sleep(4) return osl_server_process @@ -53,11 +57,11 @@ def tcp_osl_server() -> tos.TcpOslServer: Class which provides access to optiSLang server using plain TCP/IP communication protocol. """ tcp_osl_server = tos.TcpOslServer(host=_host, port=_port) - tcp_osl_server.set_timeout(timeout=3) + tcp_osl_server.set_timeout(timeout=8) return tcp_osl_server -## TcpClient +# TcpClient def test_connect_and_disconnect(osl_server_process, tcp_client): "Test ``connect``." with does_not_raise() as dnr: @@ -114,7 +118,7 @@ def test_receive_file(osl_server_process, tcp_client, tmp_path): assert dnr is None -## TcpOslServer +# TcpOslServer def test_get_server_info(osl_server_process, tcp_osl_server): """Test ``_get_server_info``.""" server_info = tcp_osl_server._get_server_info() From a9bf6f26febbfb47fb5a649dbbda4ca83fdf382a Mon Sep 17 00:00:00 2001 From: PanekOndrej Date: Fri, 14 Oct 2022 12:33:02 +0200 Subject: [PATCH 02/10] Edit docstrings, log -> property, shutdown_on_finished, start/stop tests --- src/ansys/optislang/core/optislang.py | 35 ++---- src/ansys/optislang/core/osl_process.py | 2 +- src/ansys/optislang/core/osl_server.py | 5 - src/ansys/optislang/core/tcp_osl_server.py | 128 +++++++++++++-------- tests/test_start_stop_combinations.py | 46 ++++++++ 5 files changed, 140 insertions(+), 76 deletions(-) create mode 100644 tests/test_start_stop_combinations.py diff --git a/src/ansys/optislang/core/optislang.py b/src/ansys/optislang/core/optislang.py index 8a2f6ca34..6bf2a3065 100644 --- a/src/ansys/optislang/core/optislang.py +++ b/src/ansys/optislang/core/optislang.py @@ -52,6 +52,9 @@ class Optislang: - WARNING: Log some oddities or potential problems. - INFO: Log some useful information that program works as expected. - DEBUG: The most grained logging. + shutdown_on_finished: bool, optional + Shut down when execution is finished and there are not any listeners registered. + It is ignored when the host and port parameters are specified. Defaults to ``True``. Raises ------ @@ -81,6 +84,7 @@ def __init__( name: str = None, password: str = None, loglevel: str = None, + shutdown_on_finished: bool = True, ) -> None: """Initialize a new instance of the ``Optislang`` class.""" self.__host = host @@ -91,8 +95,8 @@ def __init__( self.__ini_timeout = ini_timeout self.__name = name self.__password = password - - self.log = LOG.add_instance_logger(self.name, self, loglevel) + self.__shutdown_on_finished = shutdown_on_finished + self.__logger = LOG.add_instance_logger(self.name, self, loglevel) self.__osl_server: OslServer = self.__init_osl_server("tcp") def __init_osl_server(self, server_type: str) -> OslServer: @@ -125,6 +129,7 @@ def __init_osl_server(self, server_type: str) -> OslServer: ini_timeout=self.__ini_timeout, password=self.__password, logger=self.log, + shutdown_on_finished=self.__shutdown_on_finished, ) else: raise NotImplementedError(f'OptiSLang server of type "{server_type}" is not supported.') @@ -137,13 +142,6 @@ def __str__(self): f"PyOptiSLang: {version('ansys.optislang.core')}" ) - def __del__(self): - """Shutdown optiSLang.""" - if self.__host and self.__port: - self.shutdown() - else: - self.terminate_server_threads() - @property def name(self) -> str: """Instance unique identifier.""" @@ -154,6 +152,11 @@ def name(self) -> str: self.__name = f"optiSLang_{id(self)}" return self.__name + @property + def log(self): + """Return instance logger.""" + return self.__logger + def get_osl_version(self) -> str: """Get version of used optiSLang. @@ -517,17 +520,3 @@ def stop_gently(self, wait_for_finished: bool = True) -> None: Raised when the timeout float value expires. """ self.__osl_server.stop_gently(wait_for_finished) - - def terminate_server_threads(self) -> None: - """Terminate all local threads created by self.__osl_server. - - Raises - ------ - OslCommunicationError - Raised when an error occurs while communicating with server. - OslCommandError - Raised when the command or query fails. - TimeoutError - Raised when the timeout float value expires. - """ - self.__osl_server.terminate_server_threads() diff --git a/src/ansys/optislang/core/osl_process.py b/src/ansys/optislang/core/osl_process.py index 3aa2b2c6a..e17441698 100644 --- a/src/ansys/optislang/core/osl_process.py +++ b/src/ansys/optislang/core/osl_process.py @@ -75,7 +75,7 @@ class OslServerProcess: notifications : Iterable[ServerNotification], optional Notifications to be sent to the listener. Defaults to ``None``. shutdown_on_finished: bool, optional - Shut down when execution is finished. Defaults to ``False``. + Shut down when execution is finished. Defaults to ``True``. env_vars : Mapping[str, str], optional Additional environmental variables (key and value) for the optiSLang server process. diff --git a/src/ansys/optislang/core/osl_server.py b/src/ansys/optislang/core/osl_server.py index 797cea214..39f94c664 100644 --- a/src/ansys/optislang/core/osl_server.py +++ b/src/ansys/optislang/core/osl_server.py @@ -484,8 +484,3 @@ def stop_gently(self, wait_for_finished: bool = True) -> None: Raised when the timeout float value expires. """ pass - - @abstractmethod - def terminate_server_threads(self) -> None: - """Terminate all local threads created by this instance.""" - pass diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index c56410f36..f3f465793 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -532,10 +532,10 @@ class TcpOslListener: Name of listener. host: str Local IPv6 address. - uid: str, opt + uid: str, optional Unique ID of listener, should be used only if listener is used for optiSLangs port when started locally. - logger: OslLogger, opt + logger: OslLogger, optional Preferably OslLogger should be given. If not given, default logging.Logger is used. Raises @@ -606,55 +606,63 @@ def dispose(self) -> None: self.__listener_socket.close() @property - def uid(self): + def uid(self) -> str: """Instance unique identifier.""" return self.__uid @uid.setter - def uid(self, uid): + def uid(self, uid) -> None: self.__uid = uid @property - def name(self): + def name(self) -> str: """Instance name used for naming self.__thread.""" return self.__name @property - def timeout(self): + def timeout(self) -> Union[float, None]: """Timeout in seconds to receive a message.""" return self.__timeout @timeout.setter - def timeout(self, timeout): + def timeout(self, timeout) -> None: self.__timeout = timeout @property - def host(self): + def host(self) -> str: """Local IPv6 address associated with self.__listener_socket.""" return self.__listener_socket.getsockname()[0] @property - def port(self): + def port(self) -> int: """Port number associated with self.__listener_socket.""" return self.__listener_socket.getsockname()[1] - def add_callback(self, callback: Callable, args): - """Add callback (method) that will be called after push notification is received.""" + def add_callback(self, callback: Callable, args) -> None: + """Add callback (method) that will be called after push notification is received. + + Parameters + ---------- + callback: Callable + Method or any callable that will be called when listener receives message. + args: + Arguments to the callback. + """ self.__callbacks.append((callback, args)) # def remove_callback(self, callback: Callable): # self.__callbacks.remove(callback) - def clear_callbacks(self): + def clear_callbacks(self) -> None: """Remove all callbacks.""" self.__callbacks.clear() - def start_listening(self, timeout=None): + def start_listening(self, timeout=None) -> None: """Start new thread listening optiSLang server port. Parameters ---------- - timeout: float, opt + timeout: float, optional Listener socket timeout. """ self.__thread = threading.Thread( @@ -666,12 +674,12 @@ def start_listening(self, timeout=None): self.__run_listening_thread = True self.__thread.start() - def stop_listening(self): + def stop_listening(self) -> None: """Stop listening optiSLang server port.""" self.__run_listening_thread = False self.__thread = None - def __init_listener_socket(self, host: str, port_range: Tuple[int, int]) -> socket.socket: + def __init_listener_socket(self, host: str, port_range: Tuple[int, int]) -> None: """Initialize listener. Parameters @@ -680,11 +688,6 @@ def __init_listener_socket(self, host: str, port_range: Tuple[int, int]) -> sock A string representation of an IPv4/v6 address or domain name. port_range : Tuple[int, int] Defines the port range for port listener. Defaults to ``None``. - - Returns - ------- - socket - Listener socket. """ self.__listener_socket = None for port in range(port_range[0], port_range[1] + 1): @@ -704,7 +707,7 @@ def __listen(self, timeout=None) -> None: Parameters ---------- - timeout: float, opt + timeout: float, optional Listener socket timeout. """ start_time = time.time() @@ -736,27 +739,27 @@ def __listen(self, timeout=None) -> None: if client is not None: client.disconnect() - def __execute_callbacks(self, response): + def __execute_callbacks(self, response) -> None: """Execute all callback.""" for callback, args in self.__callbacks: callback(self, response, *args) - def is_listening(self): + def is_listening(self) -> None: """Return True if listener is listening.""" return self.is_initialized() and self.__thread is not None and self.__thread.is_alive() - def join(self): + def join(self) -> None: """Wait until self.__thread is finished.""" if not self.is_listening(): raise RuntimeError("Listener is not listening.") self.__thread.join() - def cleanup_notifications(self, timeout: float = 0.2): + def cleanup_notifications(self, timeout: float = 0.2) -> None: """Cleanup previously unprocessed push notifications. Parameters ---------- - timeout: float, opt + timeout: float, optional Listener socket timeout. Default value ``0.2``. """ while True: @@ -814,6 +817,9 @@ class TcpOslServer(OslServer): to contain a password entry. Defaults to ``None``. logger : Any, optional Object for logging. If ``None``, standard logging object is used. Defaults to ``None``. + shutdown_on_finished: bool, optional + Shut down when execution is finished and there are not any listeners registered. + It is ignored when the host and port parameters are specified. Defaults to ``True``. Raises ------ @@ -855,6 +861,7 @@ def __init__( ini_timeout: float = 20, password: str = None, logger=None, + shutdown_on_finished=True, ) -> None: """Initialize a new instance of the ``TcpOslServer`` class.""" self.__host = host @@ -878,7 +885,7 @@ def __init__( if self.__host is None or self.__port is None: self.__host = self.__class__._LOCALHOST - self._start_local(ini_timeout) + self._start_local(ini_timeout, shutdown_on_finished) else: listener = self.__create_listener( timeout=self.__timeout, @@ -1595,19 +1602,6 @@ def stop_gently(self, wait_for_finished: bool = True) -> None: else: self._logger.debug(f"Do not send STOP_GENTLY request, project status is: {status}") - def terminate_server_threads(self) -> None: - """Terminate all local threads created by this instance.""" - self.__stop_listeners_registration_thread() - # Unregister listeners and close its sockets - for listener in self.__listeners.values(): - if listener.uid is not None: - try: - self._unregister_listener(listener) - except Exception as ex: - self._logger.warn("Cannot unregister port listener: %s", ex) - if listener.is_listening(): - listener.dispose() - def _is_status_in_stopped_states(self, status: str) -> bool: """Compare current project status with list.""" stopped_states = [ @@ -1640,14 +1634,16 @@ def _unregister_listener(self, listener: TcpOslListener) -> None: self._send_command(commands.unregister_listener(str(listener.uid), self.__password)) listener.uid = None - def _start_local(self, ini_timeout: float) -> TcpOslListener: + def _start_local(self, ini_timeout: float, shutdown_on_finished: bool) -> None: """Start local optiSLang server. Parameters ---------- - ini_timeout : float, optional + ini_timeout : float Time in seconds to listen to the optiSLang server port. If the port is not listened for specified time, the optiSLang server is not started and RuntimeError is raised. + shutdown_on_finished: bool + Shut down when execution is finished and there are not any listeners registered. Raises ------ @@ -1678,7 +1674,7 @@ def _start_local(self, ini_timeout: float) -> TcpOslListener: listener=(listener.host, listener.port), listener_id=listener.uid, notifications=[ServerNotification.ALL], - shutdown_on_finished=True, + shutdown_on_finished=shutdown_on_finished, logger=self._logger, ) self.__osl_process.start() @@ -1713,6 +1709,7 @@ def __create_listener(self, timeout: float, name: str, uid: str = None) -> TcpOs Returns ------- TcpOslListener + Listener ready to be registered to optiSLang server. Raises ------ @@ -1738,7 +1735,23 @@ def __create_listener(self, timeout: float, name: str, uid: str = None) -> TcpOs return listener def __create_exec_started_listener(self) -> TcpOslListener: - """Create exec_started listener and add to self.__listeners.""" + """Create exec_started listener and add to self.__listeners. + + Returns + ------- + exec_started_listener: TcpOslListener + Listener registered to the optiSLang server and subscribed + for push notifications. + + Raises + ------ + OslCommunicationError + Raised when an error occurs while communicating with server. + OslCommandError + Raised when the command or query fails. + TimeoutError + Raised when the timeout float value expires. + """ exec_started_listener = self.__create_listener( timeout=self.__timeout, name="ExecStarted", @@ -1756,7 +1769,23 @@ def __create_exec_started_listener(self) -> TcpOslListener: return exec_started_listener def __create_exec_finished_listener(self) -> TcpOslListener: - """Create exec_finished listener and add to self.__listeners.""" + """Create exec_finished listener and add to self.__listeners. + + Returns + ------- + exec_finished_listener: TcpOslListener + Listener registered to the optiSLang server and subscribed + for push notifications. + + Raises + ------ + OslCommunicationError + Raised when an error occurs while communicating with server. + OslCommandError + Raised when the command or query fails. + TimeoutError + Raised when the timeout float value expires. + """ exec_finished_listener = self.__create_listener( timeout=self.__timeout, name="ExecFinished", @@ -1775,7 +1804,7 @@ def __create_exec_finished_listener(self) -> TcpOslListener: return exec_finished_listener def __start_listeners_registration_thread(self) -> None: - """Create new thread for refreshing of listeners registrations.""" + """Create new thread for refreshing of listeners registrations and start it.""" self.__listeners_registration_thread = threading.Thread( target=self.__refresh_listeners_registration, name="PyOptiSLang.ListenersRegistrationThread", @@ -1819,6 +1848,11 @@ def __register_listener( Nodes: [ "ACTOR_STATE_CHANGED", "ACTOR_ACTIVE_CHANGED", "ACTOR_NAME_CHANGED", "ACTOR_CONTENTS_CHANGED", "ACTOR_DATA_CHANGED" ]. + Returns + ------- + str + Uid of registered listener created by optiSLang server. + Raises ------ OslCommunicationError diff --git a/tests/test_start_stop_combinations.py b/tests/test_start_stop_combinations.py new file mode 100644 index 000000000..054424703 --- /dev/null +++ b/tests/test_start_stop_combinations.py @@ -0,0 +1,46 @@ +"""Test different start/stop/stop_gently combinations with Optislang class.""" +from contextlib import nullcontext as does_not_raise + +import pytest + +from ansys.optislang.core import Optislang + +pytestmark = pytest.mark.local_osl + + +@pytest.fixture +def optislang(scope="function", autouse=True) -> Optislang: + """Create Optislang class. + + Returns + ------- + Optislang: + Connects to the optiSLang application and provides an API to control it. + """ + return Optislang() + + +@pytest.mark.parametrize( + "input, expected", + [ + ((("start", True, True), ("stop", True)), None), # default + ((("start", False, True), ("stop", True)), None), # don't wait for started -> also default + ((("start", True, True), ("stop", False)), None), # stop: don't wait for finished + ((("start", True, False), ("stop", True)), None), # start: don't wait for finished + ((("start", False, False), ("stop", False)), None), # all false + ((("start", True, True), ("stop", True), ("stop", True)), None), + ((("start", True, True), ("stop", True), ("start", True, True)), None), + ((("stop", True), ("stop", True), ("start", True, True)), None), + ((("start", True, True), ("start", True, True), ("start", True, True)), None), + ], +) +def test_combinations(optislang, input, expected): + "Test combinations." + with does_not_raise() as dnr: + for method in input: + if method[0] == "start": + optislang.start(method[1], method[2]) + if method[0] == "stop": + optislang.stop(method[1]) + optislang.shutdown() + assert dnr is expected From c96ca5196a2bd6b4f895e9693516d4e76848fe5f Mon Sep 17 00:00:00 2001 From: PanekOndrej Date: Tue, 18 Oct 2022 10:48:05 +0200 Subject: [PATCH 03/10] Typo in test_start_stop, _finish all threads. --- src/ansys/optislang/core/tcp_osl_server.py | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index f3f465793..43ada2fce 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -1366,16 +1366,7 @@ def shutdown(self, force: bool = False) -> None: TimeoutError Raised when the parameter force is ``False`` and the timeout float value expires. """ - self.__stop_listeners_registration_thread() - # Unregister listeners and close its sockets - for listener in self.__listeners.values(): - if listener.uid is not None: - try: - self._unregister_listener(listener) - except Exception as ex: - self._logger.warn("Cannot unregister port listener: %s", ex) - if listener.is_listening(): - listener.dispose() + self.__finish_all_threads() # Only in case shutdown_on_finished option is not set, actively send shutdown command if self.__osl_process is None or ( @@ -1902,6 +1893,18 @@ def __refresh_listeners_registration(self) -> None: time.sleep(check_for_refresh) self._logger.debug("Stop refreshing listener registration, self.__refresh = False") + def __finish_all_threads(self) -> None: + """Stop listeners registration and unregister them.""" + self.__stop_listeners_registration_thread() + for listener in self.__listeners.values(): + if listener.uid is not None: + try: + self._unregister_listener(listener) + except Exception as ex: + self._logger.warn("Cannot unregister port listener: %s", ex) + if listener.is_listening(): + listener.dispose() + def _send_command(self, command: str) -> Dict: """Send command or query to the optiSLang server. From aeffa9ec334e7faa5e70e32bdfa76889cd181140 Mon Sep 17 00:00:00 2001 From: PanekOndrej Date: Tue, 18 Oct 2022 10:48:41 +0200 Subject: [PATCH 04/10] Typo in test_start_stop, _finish all threads. --- tests/test_start_stop_combinations.py | 2 +- tests/test_tcp_osl_server.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_start_stop_combinations.py b/tests/test_start_stop_combinations.py index 054424703..aa4fd8127 100644 --- a/tests/test_start_stop_combinations.py +++ b/tests/test_start_stop_combinations.py @@ -9,7 +9,7 @@ @pytest.fixture -def optislang(scope="function", autouse=True) -> Optislang: +def optislang() -> Optislang: """Create Optislang class. Returns diff --git a/tests/test_tcp_osl_server.py b/tests/test_tcp_osl_server.py index ab27e0ec0..8d39d1625 100644 --- a/tests/test_tcp_osl_server.py +++ b/tests/test_tcp_osl_server.py @@ -23,7 +23,7 @@ def osl_server_process(): # Will be executed before each test osl_server_process = OslServerProcess(shutdown_on_finished=False) osl_server_process.start() - time.sleep(4) + time.sleep(5) return osl_server_process @@ -57,7 +57,7 @@ def tcp_osl_server() -> tos.TcpOslServer: Class which provides access to optiSLang server using plain TCP/IP communication protocol. """ tcp_osl_server = tos.TcpOslServer(host=_host, port=_port) - tcp_osl_server.set_timeout(timeout=8) + tcp_osl_server.set_timeout(timeout=10) return tcp_osl_server From 0bfd4c945459d9ef61ee3cf926bf7165e6c081f6 Mon Sep 17 00:00:00 2001 From: fahlberg Date: Tue, 18 Oct 2022 16:43:24 +0200 Subject: [PATCH 05/10] Removed superfluous "end of list" notification --- src/ansys/optislang/core/osl_process.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/ansys/optislang/core/osl_process.py b/src/ansys/optislang/core/osl_process.py index e17441698..2826dab2e 100644 --- a/src/ansys/optislang/core/osl_process.py +++ b/src/ansys/optislang/core/osl_process.py @@ -34,8 +34,7 @@ class ServerNotification(Enum): ACTOR_NAME_CHANGED = 13 ACTOR_CONTENTS_CHANGED = 14 ACTOR_DATA_CHANGED = 15 - NUM_NOTIFICATIONS = 16 - ALL = 17 + ALL = 16 class OslServerProcess: From fbdb84cd3be033014b43b27f524256b24a83525e Mon Sep 17 00:00:00 2001 From: fahlberg Date: Tue, 18 Oct 2022 16:46:45 +0200 Subject: [PATCH 06/10] By default, subscribe only for push notifications "SERVER_UP SERVER_DOWN" for general_listener. We don't want to receive (and osl to issue) all notifications. --- src/ansys/optislang/core/tcp_osl_server.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index 43ada2fce..7da81f7de 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -894,7 +894,10 @@ def __init__( listener.uid = self.__register_listener( host=listener.host, port=listener.port, - notifications=[ServerNotification.ALL], + notifications=[ + ServerNotification.SERVER_UP, + ServerNotification.SERVER_DOWN, + ], ) self.__listeners["main"] = listener self.__start_listeners_registration_thread() @@ -1664,7 +1667,10 @@ def _start_local(self, ini_timeout: float, shutdown_on_finished: bool) -> None: password=self.__password, listener=(listener.host, listener.port), listener_id=listener.uid, - notifications=[ServerNotification.ALL], + notifications=[ + ServerNotification.SERVER_UP, + ServerNotification.SERVER_DOWN, + ], shutdown_on_finished=shutdown_on_finished, logger=self._logger, ) From 0d9bb22ba76224cf4b81a8350e73dc53092a86c9 Mon Sep 17 00:00:00 2001 From: fahlberg Date: Tue, 18 Oct 2022 17:12:30 +0200 Subject: [PATCH 07/10] Minor spelling fixes --- src/ansys/optislang/core/tcp_osl_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index 7da81f7de..b4748beb4 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -1481,12 +1481,12 @@ def start(self, wait_for_started: bool = True, wait_for_finished: bool = True) - if not already_running and (wait_for_started or wait_for_finished): self._logger.info(f"Waiting for started") successfully_started = wait_for_started_queue.get() - self._logger.info(f"Successfully_started: {successfully_started}.") + self._logger.info(f"Successfully started: {successfully_started}.") if wait_for_finished and (successfully_started or already_running): self._logger.info(f"Waiting for finished") successfully_finished = wait_for_finished_queue.get() - self._logger.info(f"Successfully_finished: {successfully_finished}.") + self._logger.info(f"Successfully finished: {successfully_finished}.") else: time.sleep(1) From 0cd7b10c421a360f7c7111cdcad5371f58bcd732 Mon Sep 17 00:00:00 2001 From: fahlberg Date: Tue, 18 Oct 2022 17:13:46 +0200 Subject: [PATCH 08/10] Fixed listener thread termination handling --- src/ansys/optislang/core/tcp_osl_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index b4748beb4..e454ead0e 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -2018,15 +2018,15 @@ def __terminate_listener_thread( ) -> None: """Terminate listener thread if execution finished or failed.""" type = response.get("type", None) - if type in [ServerNotification.EXEC_FAILED.name or ServerNotification.CHECK_FAILED.name]: + if type in [ServerNotification.EXEC_FAILED.name, ServerNotification.CHECK_FAILED.name]: sender.stop_listening() sender.clear_callbacks() - target_queue.put("False") + target_queue.put(False) logger.error(f"Listener {sender.name} received error notification.") elif type in target_notifications: sender.stop_listening() sender.clear_callbacks() - target_queue.put("True") + target_queue.put(True) logger.debug(f"Listener {sender.name} received expected notification.") elif type is None: logger.error("Invalid response from server, push notification not evaluated.") From 2d218e183abd12dcc4b8c2692fd73ea1f5784d12 Mon Sep 17 00:00:00 2001 From: fahlberg Date: Tue, 18 Oct 2022 17:19:00 +0200 Subject: [PATCH 09/10] Use the new PROCESSING_STARTED push notification instead of the EXECUTION_STARTED notification. This ensures that start (wait_for_started=True) actually waits for the project status transitioning to PROCESSING. --- src/ansys/optislang/core/osl_process.py | 21 +++++++++++---------- src/ansys/optislang/core/server_commands.py | 5 +++-- src/ansys/optislang/core/tcp_osl_server.py | 10 ++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/ansys/optislang/core/osl_process.py b/src/ansys/optislang/core/osl_process.py index 2826dab2e..4d0728268 100644 --- a/src/ansys/optislang/core/osl_process.py +++ b/src/ansys/optislang/core/osl_process.py @@ -25,16 +25,17 @@ class ServerNotification(Enum): LOG_ERROR = 4 LOG_DEBUG = 5 EXECUTION_STARTED = 6 - EXECUTION_FINISHED = 7 - NOTHING_PROCESSED = 8 - CHECK_FAILED = 9 - EXEC_FAILED = 10 - ACTOR_STATE_CHANGED = 11 - ACTOR_ACTIVE_CHANGED = 12 - ACTOR_NAME_CHANGED = 13 - ACTOR_CONTENTS_CHANGED = 14 - ACTOR_DATA_CHANGED = 15 - ALL = 16 + PROCESSING_STARTED = 7 + EXECUTION_FINISHED = 8 + NOTHING_PROCESSED = 9 + CHECK_FAILED = 10 + EXEC_FAILED = 11 + ACTOR_STATE_CHANGED = 12 + ACTOR_ACTIVE_CHANGED = 13 + ACTOR_NAME_CHANGED = 14 + ACTOR_CONTENTS_CHANGED = 15 + ACTOR_DATA_CHANGED = 16 + ALL = 17 class OslServerProcess: diff --git a/src/ansys/optislang/core/server_commands.py b/src/ansys/optislang/core/server_commands.py index e738fe428..c5b6c0fef 100644 --- a/src/ansys/optislang/core/server_commands.py +++ b/src/ansys/optislang/core/server_commands.py @@ -1348,8 +1348,8 @@ def subscribe_for_push_notifications( Either ["ALL"] or Sequence picked from below options: Server: [ "SERVER_UP", "SERVER_DOWN" ] (always be sent by default). Logging: [ "LOG_INFO", "LOG_WARNING", "LOG_ERROR", "LOG_DEBUG" ]. - Project: [ "EXECUTION_STARTED", "EXECUTION_FINISHED", "NOTHING_PROCESSED", - "CHECK_FAILED", "EXEC_FAILED" ]. + Project: [ "EXECUTION_STARTED", "PROCESSING_STARTED", "EXECUTION_FINISHED", + "NOTHING_PROCESSED", "CHECK_FAILED", "EXEC_FAILED" ]. Nodes: [ "ACTOR_STATE_CHANGED", "ACTOR_ACTIVE_CHANGED", "ACTOR_NAME_CHANGED", "ACTOR_CONTENTS_CHANGED", "ACTOR_DATA_CHANGED" ]. node_types: Sequence, opt @@ -1366,6 +1366,7 @@ def subscribe_for_push_notifications( logging = ["LOG_INFO", "LOG_WARNING", "LOG_ERROR", "LOG_DEBUG"] project = [ "EXECUTION_STARTED", + "PROCESSING_STARTED", "EXECUTION_FINISHED", "NOTHING_PROCESSED", "CHECK_FAILED", diff --git a/src/ansys/optislang/core/tcp_osl_server.py b/src/ansys/optislang/core/tcp_osl_server.py index e454ead0e..c41f749cb 100644 --- a/src/ansys/optislang/core/tcp_osl_server.py +++ b/src/ansys/optislang/core/tcp_osl_server.py @@ -1447,7 +1447,7 @@ def start(self, wait_for_started: bool = True, wait_for_finished: bool = True) - exec_started_listener.add_callback( self.__class__.__terminate_listener_thread, ( - [ServerNotification.EXECUTION_STARTED.name], + [ServerNotification.PROCESSING_STARTED.name], wait_for_started_queue, self._logger, ), @@ -1487,8 +1487,6 @@ def start(self, wait_for_started: bool = True, wait_for_finished: bool = True) - self._logger.info(f"Waiting for finished") successfully_finished = wait_for_finished_queue.get() self._logger.info(f"Successfully finished: {successfully_finished}.") - else: - time.sleep(1) def stop(self, wait_for_finished: bool = True) -> None: """Stop project execution. @@ -1757,7 +1755,7 @@ def __create_exec_started_listener(self) -> TcpOslListener: host=exec_started_listener.host, port=exec_started_listener.port, notifications=[ - ServerNotification.EXECUTION_STARTED, + ServerNotification.PROCESSING_STARTED, ServerNotification.EXEC_FAILED, ServerNotification.CHECK_FAILED, ], @@ -1840,8 +1838,8 @@ def __register_listener( Either ["ALL"] or Sequence picked from below options: Server: [ "SERVER_UP", "SERVER_DOWN" ] (always be sent by default). Logging: [ "LOG_INFO", "LOG_WARNING", "LOG_ERROR", "LOG_DEBUG" ]. - Project: [ "EXECUTION_STARTED", "EXECUTION_FINISHED", "NOTHING_PROCESSED", - "CHECK_FAILED", "EXEC_FAILED" ]. + Project: [ "EXECUTION_STARTED", "PROCESSING_STARTED", "EXECUTION_FINISHED", + "NOTHING_PROCESSED", "CHECK_FAILED", "EXEC_FAILED" ]. Nodes: [ "ACTOR_STATE_CHANGED", "ACTOR_ACTIVE_CHANGED", "ACTOR_NAME_CHANGED", "ACTOR_CONTENTS_CHANGED", "ACTOR_DATA_CHANGED" ]. From 24672fc2d67ceab55d479add05506a68246be2c2 Mon Sep 17 00:00:00 2001 From: fahlberg Date: Wed, 19 Oct 2022 16:14:12 +0200 Subject: [PATCH 10/10] Fixed "--enable-notifications" process args. Multiple space-separated args need to be appended to subprocess "args" separately. --- src/ansys/optislang/core/osl_process.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ansys/optislang/core/osl_process.py b/src/ansys/optislang/core/osl_process.py index 4d0728268..f63ab98dc 100644 --- a/src/ansys/optislang/core/osl_process.py +++ b/src/ansys/optislang/core/osl_process.py @@ -470,11 +470,9 @@ def _get_process_args(self) -> List[str]: if self.__notifications is not None: # Subscribe to push notifications sent to the listener. - cmd_arg = "" + args.append("--enable-notifications") for notification in self.__notifications: - cmd_arg += notification.name - cmd_arg += " " - args.append(f"--enable-notifications={cmd_arg.strip()}") + args.append(notification.name) if self.__additional_args is not None: for arg_name, arg_value in self.__additional_args.items():