Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make job id's usable outside project #2246

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions spinetoolbox/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class SpineToolboxProject(MetaObject):
specification_saved = Signal(str, str)
"""Emitted after a specification has been saved."""

LOCAL_EXECUTION_JOB_ID = "1"

def __init__(self, toolbox, p_dir, plugin_specs, app_settings, settings, logger):
"""
Args:
Expand Down Expand Up @@ -978,16 +980,16 @@ def _execute_dags(self, dags, execution_permits_list):
if self._engine_workers:
self._logger.msg_error.emit("Execution already in progress.")
return
self.job_id = self.prepare_remote_execution()
if not self.job_id:
job_id = self.prepare_remote_execution()
if not job_id:
self.project_execution_finished.emit()
return
settings = make_settings_dict_for_engine(self._app_settings)
darker_fg_color = QColor(FG_COLOR).darker().name()
darker = lambda x: f'<span style="color: {darker_fg_color}">{x}</span>'
for k, (dag, execution_permits) in enumerate(zip(dags, execution_permits_list)):
dag_identifier = f"{k + 1}/{len(dags)}"
worker = self.create_engine_worker(dag, execution_permits, dag_identifier, settings)
worker = self.create_engine_worker(dag, execution_permits, dag_identifier, settings, job_id)
if worker is None:
continue
self._logger.msg.emit("<b>Starting DAG {0}</b>".format(dag_identifier))
Expand All @@ -1003,14 +1005,15 @@ def _execute_dags(self, dags, execution_permits_list):
for worker in self._engine_workers:
worker.start()

def create_engine_worker(self, dag, execution_permits, dag_identifier, settings):
def create_engine_worker(self, dag, execution_permits, dag_identifier, settings, job_id):
"""Creates and returns a SpineEngineWorker to execute given *validated* dag.

Args:
dag (DiGraph): The dag
execution_permits (dict): mapping item names to a boolean indicating whether to execute it or skip it
dag_identifier (str): A string identifying the dag, for logging
settings (dict): project and app settings to send to the spine engine.
job_id (str): job id

Returns:
SpineEngineWorker
Expand Down Expand Up @@ -1045,7 +1048,7 @@ def create_engine_worker(self, dag, execution_permits, dag_identifier, settings)
"settings": settings,
"project_dir": self.project_dir.replace(os.sep, "/"),
}
worker = SpineEngineWorker(data, dag, dag_identifier, items, connections, self._logger, self.job_id)
worker = SpineEngineWorker(data, dag, dag_identifier, items, connections, self._logger, job_id)
return worker

def _handle_engine_worker_finished(self, worker):
Expand All @@ -1068,7 +1071,7 @@ def _handle_engine_worker_finished(self, worker):
for item, direction, state in finished_worker.successful_executions:
item.handle_execution_successful(direction, state)
finished_worker.clean_up()
self.finalize_remote_execution()
self.finalize_remote_execution(worker.job_id)
self._engine_workers.clear()
self.project_execution_finished.emit()

Expand Down Expand Up @@ -1418,11 +1421,11 @@ def prepare_remote_execution(self):
"""Pings the server and sends the project as a zip-file to server.

Returns:
str: Job Id if server is ready for remote execution, empty string if something went wrong or "1" if
local execution is enabled.
str: Job Id if server is ready for remote execution, empty string if something went wrong
or LOCAL_EXECUTION_JOB_ID if local execution is enabled.
"""
if not self._app_settings.value("engineSettings/remoteExecutionEnabled", defaultValue="false") == "true":
return "1" # Something that isn't False
return self.LOCAL_EXECUTION_JOB_ID
host, port, sec_model, sec_folder = self._toolbox.engine_server_settings()
if not host:
self._logger.msg_error.emit(
Expand Down Expand Up @@ -1469,8 +1472,12 @@ def prepare_remote_execution(self):
engine_client.close()
return job_id

def finalize_remote_execution(self):
"""Sends a request to server to remove the project directory and removes the project ZIP file from client."""
def finalize_remote_execution(self, job_id):
"""Sends a request to server to remove the project directory and removes the project ZIP file from client.y

Args:
job_id (str): job id
"""
if not self._app_settings.value("engineSettings/remoteExecutionEnabled", defaultValue="false") == "true":
return
host, port, sec_model, sec_folder = self._toolbox.engine_server_settings()
Expand All @@ -1481,7 +1488,7 @@ def finalize_remote_execution(self):
f"Server is not responding. {e}. " f"Check settings in <b>File->Settings->Engine</b>."
)
return
engine_client.remove_project_from_server(self.job_id)
engine_client.remove_project_from_server(job_id)
engine_client.close()
project_zip_file = os.path.abspath(os.path.join(self.project_dir, os.pardir, PROJECT_ZIP_FILENAME + ".zip"))
if not os.path.isfile(project_zip_file):
Expand Down
7 changes: 6 additions & 1 deletion spinetoolbox/spine_engine_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ def __init__(self, engine_data, dag, dag_identifier, project_items, connections,
project_items (dict): mapping from project item name to :class:`ProjectItem`
connections (dict): mapping from jump name to :class:`LoggingConnection` or :class:`LoggingJump`
logger (LoggerInterface): a logger
job_id: Job Id for remote execution
job_id (str): Job id for remote execution
"""
super().__init__()
self._engine_data = engine_data
exec_remotely = engine_data["settings"].get("engineSettings/remoteExecutionEnabled", "false") == "true"
self._job_id = job_id
self._engine_mngr = make_engine_manager(exec_remotely, job_id)
self.dag = dag
self.dag_identifier = dag_identifier
Expand All @@ -148,6 +149,10 @@ def __init__(self, engine_data, dag, dag_identifier, project_items, connections,
self.moveToThread(self._thread)
self._thread.started.connect(self.do_work)

@property
def job_id(self):
return self._job_id

@property
def engine_data(self):
"""Engine data dictionary."""
Expand Down