diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 8ab2669..621e684 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -11,9 +11,11 @@ from io import BytesIO import jwt import logging +import os import pathlib import requests import sys +import tempfile import time from contextlib import nullcontext @@ -947,22 +949,66 @@ async def whoami(self, machine=None) -> Optional[str]: async def submit( self, machine: str, - job_script: str, + job_script: Optional[str] = None, local_file: Optional[bool] = True, + script_str: Optional[str] = None, + script_local_path: Optional[str] = None, + script_remote_path: Optional[str] = None, account: Optional[str] = None, - env_vars: Optional[dict[str, Any]] = None + env_vars: Optional[dict[str, Any]] = None, ) -> t.JobSubmit: - """Submits a batch script to SLURM on the target system + """Submits a batch script to SLURM on the target system. One of `script_str`, `script_local` and `script_remote` needs to be set. :param machine: the machine name where the scheduler belongs to - :param job_script: the path of the script (if it's local it can be relative path, if it is on the machine it has to be the absolute path) - :param local_file: batch file can be local (default) or on the machine's filesystem + :param job_script: [deprecated] use `script_str`, `script_local_path` or `script_remote_path` + :param local_file: [deprecated] + :param script_str: the content of the script to be submitted + :param script_local_path: the path of the script on the local file system + :param script_remote_path: the full path of the script on the remote file system :param account: submit the job with this project account :param env_vars: dictionary (varName, value) defining environment variables to be exported for the job :calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path` GET `/tasks/{taskid}` """ + if [ + script_str is None, + script_local_path is None, + script_remote_path is None, + job_script is None + ].count(False) != 1: + logger.error( + "Only one of the arguments `script_str`, `script_local_path`, " + "`script_remote_path`, and `job_script` can be set at a time. " + "`job_script` is deprecated, so prefer one of the others." + ) + raise ValueError( + "Only one of the arguments `script_str`, `script_local_path`, " + "`script_remote_path`, and `job_script` can be set at a time. " + ) + + if job_script is not None: + logger.warning("`local_file` argument is deprecated, please use one of " + "`script_str`, `script_local_path` or `script_remote_path` instead") + + if local_file: + script_local_path = job_script + else: + script_remote_path = job_script + + if script_str is not None: + is_path = False + is_local = True + job_script_file = None + elif script_local_path is not None: + is_path = True + is_local = True + job_script_file = script_local_path + elif script_remote_path is not None: + is_path = True + is_local = False + job_script_file = script_remote_path + env = json.dumps(env_vars) if env_vars else None data = {} if account: @@ -971,21 +1017,35 @@ async def submit( if env: data["env"] = env - if local_file: - with open(job_script, "rb") as f: + context: Any = ( + tempfile.TemporaryDirectory() + if not is_path + else nullcontext(None) + ) + with context as tmpdirname: + if not is_path: + logger.info(f"Created temporary directory {tmpdirname}") + with open(os.path.join(tmpdirname, "script.batch"), "w") as temp_file: + temp_file.write(script_str) # type: ignore + + job_script_file = os.path.join(tmpdirname, "script.batch") + + if is_local: + with open(job_script_file, "rb") as f: # type: ignore + resp = await self._post_request( + endpoint="/compute/jobs/upload", + additional_headers={"X-Machine-Name": machine}, + files={"file": f}, + data=data, + ) + else: + assert isinstance(job_script_file, str) + data["targetPath"] = job_script_file resp = await self._post_request( - endpoint="/compute/jobs/upload", + endpoint="/compute/jobs/path", additional_headers={"X-Machine-Name": machine}, - files={"file": f}, data=data, ) - else: - data["targetPath"] = job_script - resp = await self._post_request( - endpoint="/compute/jobs/path", - additional_headers={"X-Machine-Name": machine}, - data=data, - ) json_response = self._json_response([resp], 201) logger.info(f"Job submission task: {json_response['task_id']}") diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 2bc068d..d0f1c0b 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -9,9 +9,11 @@ import itertools import jwt import logging +import os import pathlib import requests import sys +import tempfile import time from contextlib import nullcontext @@ -834,26 +836,85 @@ def _acct_request(self, machine: str, jobs=None, start_time=None, end_time=None) def submit( self, machine: str, - job_script: str, + job_script: Optional[str] = None, local_file: Optional[bool] = True, + script_str: Optional[str] = None, + script_local_path: Optional[str] = None, + script_remote_path: Optional[str] = None, account: Optional[str] = None, - env_vars: Optional[dict[str, Any]] = None + env_vars: Optional[dict[str, Any]] = None, ) -> t.JobSubmit: - """Submits a batch script to SLURM on the target system + """Submits a batch script to SLURM on the target system. One of `script_str`, `script_local` and `script_remote` needs to be set. :param machine: the machine name where the scheduler belongs to - :param job_script: the path of the script (if it's local it can be relative path, if it is on the machine it has to be the absolute path) - :param local_file: batch file can be local (default) or on the machine's filesystem + :param job_script: [deprecated] use `script_str`, `script_local_path` or `script_remote_path` + :param local_file: [deprecated] + :param script_str: the content of the script to be submitted + :param script_local_path: the path of the script on the local file system + :param script_remote_path: the full path of the script on the remote file system :param account: submit the job with this project account :param env_vars: dictionary (varName, value) defining environment variables to be exported for the job :calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path` GET `/tasks/{taskid}` """ + if [ + script_str is None, + script_local_path is None, + script_remote_path is None, + job_script is None + ].count(False) != 1: + logger.error( + "Only one of the arguments `script_str`, `script_local_path`, " + "`script_remote_path`, and `job_script` can be set at a time. " + "`job_script` is deprecated, so prefer one of the others." + ) + raise ValueError( + "Only one of the arguments `script_str`, `script_local_path`, " + "`script_remote_path`, and `job_script` can be set at a time. " + ) + + if job_script is not None: + logger.warning("`local_file` argument is deprecated, please use one of " + "`script_str`, `script_local_path` or `script_remote_path` instead") + + if local_file: + script_local_path = job_script + else: + script_remote_path = job_script + + if script_str is not None: + is_path = False + is_local = True + job_script_file = None + elif script_local_path is not None: + is_path = True + is_local = True + job_script_file = script_local_path + elif script_remote_path is not None: + is_path = True + is_local = False + job_script_file = script_remote_path + self._current_method_requests = [] - env = json.dumps(env_vars) if env_vars else None - json_response = self._submit_request(machine, job_script, local_file, account, env) - logger.info(f"Job submission task: {json_response['task_id']}") + + # Check if `job_script` is a filename or a job script and create a file if necessary + context: Any = ( + tempfile.TemporaryDirectory() + if not is_path + else nullcontext(None) + ) + with context as tmpdirname: + if not is_path: + logger.info(f"Created temporary directory {tmpdirname}") + with open(os.path.join(tmpdirname, "script.batch"), "w") as temp_file: + temp_file.write(script_str) # type: ignore + + job_script_file = os.path.join(tmpdirname, "script.batch") + + env = json.dumps(env_vars) if env_vars else None + json_response = self._submit_request(machine, job_script_file, is_local, account, env) + logger.info(f"Job submission task: {json_response['task_id']}") # Inject taskid in the result result = self._poll_tasks( diff --git a/firecrest/cli/__init__.py b/firecrest/cli/__init__.py index 30b6610..de68d46 100644 --- a/firecrest/cli/__init__.py +++ b/firecrest/cli/__init__.py @@ -809,9 +809,12 @@ def submit( help="The batch file can be local (default) or on the system's filesystem.", ), ): - """Submit a batch script to the workload manger of the target system""" + """Submit a batch script to the workload manager of the target system""" try: - console.print(client.submit(system, job_script, local, account=account)) + if local: + console.print(client.submit(system, script_local_path=job_script, account=account)) + else: + console.print(client.submit(system, script_remote_path=job_script, account=account)) except Exception as e: examine_exeption(e) raise typer.Exit(code=1)