Skip to content

Commit

Permalink
Allow passing batch script to submit method as a string (#77)
Browse files Browse the repository at this point in the history
* Add env_vars args in job submission

* Add option for submission from string in basic client

* Remove unnecessary assignment

* Small refactoring

* Fix docstring

* Add str submission in async client + refactor

* Relax typing to avoid errors

* Add exception for wrong arguments in submission

* Add error message in exception

* Fix bug

* Fix typo
  • Loading branch information
ekouts authored Nov 17, 2023
1 parent a2a50a8 commit 2bbf26b
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 26 deletions.
92 changes: 76 additions & 16 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from io import BytesIO
import jwt
import logging
import os
import pathlib
import requests
import sys
import tempfile
import time

from contextlib import nullcontext
Expand Down Expand Up @@ -947,22 +949,66 @@ async def whoami(self, machine=None) -> Optional[str]:
async def submit(
self,
machine: str,
job_script: str,
job_script: Optional[str] = None,
local_file: Optional[bool] = True,
script_str: Optional[str] = None,
script_local_path: Optional[str] = None,
script_remote_path: Optional[str] = None,
account: Optional[str] = None,
env_vars: Optional[dict[str, Any]] = None
env_vars: Optional[dict[str, Any]] = None,
) -> t.JobSubmit:
"""Submits a batch script to SLURM on the target system
"""Submits a batch script to SLURM on the target system. One of `script_str`, `script_local` and `script_remote` needs to be set.
:param machine: the machine name where the scheduler belongs to
:param job_script: the path of the script (if it's local it can be relative path, if it is on the machine it has to be the absolute path)
:param local_file: batch file can be local (default) or on the machine's filesystem
:param job_script: [deprecated] use `script_str`, `script_local_path` or `script_remote_path`
:param local_file: [deprecated]
:param script_str: the content of the script to be submitted
:param script_local_path: the path of the script on the local file system
:param script_remote_path: the full path of the script on the remote file system
:param account: submit the job with this project account
:param env_vars: dictionary (varName, value) defining environment variables to be exported for the job
:calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path`
GET `/tasks/{taskid}`
"""
if [
script_str is None,
script_local_path is None,
script_remote_path is None,
job_script is None
].count(False) != 1:
logger.error(
"Only one of the arguments `script_str`, `script_local_path`, "
"`script_remote_path`, and `job_script` can be set at a time. "
"`job_script` is deprecated, so prefer one of the others."
)
raise ValueError(
"Only one of the arguments `script_str`, `script_local_path`, "
"`script_remote_path`, and `job_script` can be set at a time. "
)

if job_script is not None:
logger.warning("`local_file` argument is deprecated, please use one of "
"`script_str`, `script_local_path` or `script_remote_path` instead")

if local_file:
script_local_path = job_script
else:
script_remote_path = job_script

if script_str is not None:
is_path = False
is_local = True
job_script_file = None
elif script_local_path is not None:
is_path = True
is_local = True
job_script_file = script_local_path
elif script_remote_path is not None:
is_path = True
is_local = False
job_script_file = script_remote_path

env = json.dumps(env_vars) if env_vars else None
data = {}
if account:
Expand All @@ -971,21 +1017,35 @@ async def submit(
if env:
data["env"] = env

if local_file:
with open(job_script, "rb") as f:
context: Any = (
tempfile.TemporaryDirectory()
if not is_path
else nullcontext(None)
)
with context as tmpdirname:
if not is_path:
logger.info(f"Created temporary directory {tmpdirname}")
with open(os.path.join(tmpdirname, "script.batch"), "w") as temp_file:
temp_file.write(script_str) # type: ignore

job_script_file = os.path.join(tmpdirname, "script.batch")

if is_local:
with open(job_script_file, "rb") as f: # type: ignore
resp = await self._post_request(
endpoint="/compute/jobs/upload",
additional_headers={"X-Machine-Name": machine},
files={"file": f},
data=data,
)
else:
assert isinstance(job_script_file, str)
data["targetPath"] = job_script_file
resp = await self._post_request(
endpoint="/compute/jobs/upload",
endpoint="/compute/jobs/path",
additional_headers={"X-Machine-Name": machine},
files={"file": f},
data=data,
)
else:
data["targetPath"] = job_script
resp = await self._post_request(
endpoint="/compute/jobs/path",
additional_headers={"X-Machine-Name": machine},
data=data,
)

json_response = self._json_response([resp], 201)
logger.info(f"Job submission task: {json_response['task_id']}")
Expand Down
77 changes: 69 additions & 8 deletions firecrest/BasicClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import itertools
import jwt
import logging
import os
import pathlib
import requests
import sys
import tempfile
import time

from contextlib import nullcontext
Expand Down Expand Up @@ -834,26 +836,85 @@ def _acct_request(self, machine: str, jobs=None, start_time=None, end_time=None)
def submit(
self,
machine: str,
job_script: str,
job_script: Optional[str] = None,
local_file: Optional[bool] = True,
script_str: Optional[str] = None,
script_local_path: Optional[str] = None,
script_remote_path: Optional[str] = None,
account: Optional[str] = None,
env_vars: Optional[dict[str, Any]] = None
env_vars: Optional[dict[str, Any]] = None,
) -> t.JobSubmit:
"""Submits a batch script to SLURM on the target system
"""Submits a batch script to SLURM on the target system. One of `script_str`, `script_local` and `script_remote` needs to be set.
:param machine: the machine name where the scheduler belongs to
:param job_script: the path of the script (if it's local it can be relative path, if it is on the machine it has to be the absolute path)
:param local_file: batch file can be local (default) or on the machine's filesystem
:param job_script: [deprecated] use `script_str`, `script_local_path` or `script_remote_path`
:param local_file: [deprecated]
:param script_str: the content of the script to be submitted
:param script_local_path: the path of the script on the local file system
:param script_remote_path: the full path of the script on the remote file system
:param account: submit the job with this project account
:param env_vars: dictionary (varName, value) defining environment variables to be exported for the job
:calls: POST `/compute/jobs/upload` or POST `/compute/jobs/path`
GET `/tasks/{taskid}`
"""
if [
script_str is None,
script_local_path is None,
script_remote_path is None,
job_script is None
].count(False) != 1:
logger.error(
"Only one of the arguments `script_str`, `script_local_path`, "
"`script_remote_path`, and `job_script` can be set at a time. "
"`job_script` is deprecated, so prefer one of the others."
)
raise ValueError(
"Only one of the arguments `script_str`, `script_local_path`, "
"`script_remote_path`, and `job_script` can be set at a time. "
)

if job_script is not None:
logger.warning("`local_file` argument is deprecated, please use one of "
"`script_str`, `script_local_path` or `script_remote_path` instead")

if local_file:
script_local_path = job_script
else:
script_remote_path = job_script

if script_str is not None:
is_path = False
is_local = True
job_script_file = None
elif script_local_path is not None:
is_path = True
is_local = True
job_script_file = script_local_path
elif script_remote_path is not None:
is_path = True
is_local = False
job_script_file = script_remote_path

self._current_method_requests = []
env = json.dumps(env_vars) if env_vars else None
json_response = self._submit_request(machine, job_script, local_file, account, env)
logger.info(f"Job submission task: {json_response['task_id']}")

# Check if `job_script` is a filename or a job script and create a file if necessary
context: Any = (
tempfile.TemporaryDirectory()
if not is_path
else nullcontext(None)
)
with context as tmpdirname:
if not is_path:
logger.info(f"Created temporary directory {tmpdirname}")
with open(os.path.join(tmpdirname, "script.batch"), "w") as temp_file:
temp_file.write(script_str) # type: ignore

job_script_file = os.path.join(tmpdirname, "script.batch")

env = json.dumps(env_vars) if env_vars else None
json_response = self._submit_request(machine, job_script_file, is_local, account, env)
logger.info(f"Job submission task: {json_response['task_id']}")

# Inject taskid in the result
result = self._poll_tasks(
Expand Down
7 changes: 5 additions & 2 deletions firecrest/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,12 @@ def submit(
help="The batch file can be local (default) or on the system's filesystem.",
),
):
"""Submit a batch script to the workload manger of the target system"""
"""Submit a batch script to the workload manager of the target system"""
try:
console.print(client.submit(system, job_script, local, account=account))
if local:
console.print(client.submit(system, script_local_path=job_script, account=account))
else:
console.print(client.submit(system, script_remote_path=job_script, account=account))
except Exception as e:
examine_exeption(e)
raise typer.Exit(code=1)
Expand Down

0 comments on commit 2bbf26b

Please sign in to comment.