Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete job files when rerunning; customizable execution command #201

Merged
merged 7 commits into from
Nov 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename cleanup in JobDoc
gpetretto committed Nov 8, 2024
commit 2767e11facb11ec0324ddc8ecdc5db2cbe7392ad
2 changes: 1 addition & 1 deletion src/jobflow_remote/jobs/data.py
Original file line number Diff line number Diff line change
@@ -119,7 +119,7 @@ class RemoteInfo(BaseModel):
process_id: Optional[str] = None
retry_time_limit: Optional[datetime] = None
error: Optional[str] = None
cleanup: bool = False
prerun_cleanup: bool = False


class JobInfo(BaseModel):
6 changes: 3 additions & 3 deletions src/jobflow_remote/jobs/jobcontroller.py
Original file line number Diff line number Diff line change
@@ -1128,7 +1128,7 @@ def _full_rerun(
if child_doc["state"] != JobState.WAITING.value:
modified_jobs.append(child_doc["db_id"])
if delete_files:
child_doc_update["remote.cleanup"] = True
child_doc_update["remote.prerun_cleanup"] = True
child_lock.update_on_release = {"$set": child_doc_update}
updated_states[child_doc["uuid"]][child_doc["index"]] = (
JobState.WAITING
@@ -1146,7 +1146,7 @@ def _full_rerun(
job_doc_update = get_reset_job_base_dict()
job_doc_update["state"] = JobState.READY.value
if delete_files:
job_doc_update["remote.cleanup"] = True
job_doc_update["remote.prerun_cleanup"] = True

return job_doc_update, modified_jobs

@@ -1181,7 +1181,7 @@ def _reset_remote(self, doc: dict, delete_files: bool = True) -> dict:
job_doc_update = get_reset_job_base_dict()
job_doc_update["state"] = JobState.CHECKED_OUT.value
if delete_files:
job_doc_update["remote.cleanup"] = True
job_doc_update["remote.prerun_cleanup"] = True

return job_doc_update

6 changes: 3 additions & 3 deletions src/jobflow_remote/jobs/runner.py
Original file line number Diff line number Diff line change
@@ -576,10 +576,10 @@ def upload(self, lock: MongoLock) -> None:
worker = self.get_worker(doc["worker"])
host = self.get_host(doc["worker"])

# if cleanup is specified (job was likely rerun) delete the folder before
# if prerun_cleanup is specified (job was likely rerun) delete the folder before
# reuploading the data
run_dir = doc["run_dir"]
if doc.get("remote", {}).get("cleanup", False):
if doc.get("remote", {}).get("prerun_cleanup", False):
try:
deleted = safe_remove_job_files(
host=host, run_dir=run_dir, raise_on_error=True
@@ -639,7 +639,7 @@ def upload(self, lock: MongoLock) -> None:
"$set": {
"run_dir": remote_path,
"state": JobState.UPLOADED.value,
"remote.cleanup": False,
"remote.prerun_cleanup": False,
}
}
lock.update_on_release = set_output
8 changes: 5 additions & 3 deletions tests/db/cli/test_job.py
Original file line number Diff line number Diff line change
@@ -134,18 +134,20 @@ def test_rerun(job_controller, two_flows_four_jobs) -> None:
assert os.path.isdir(job_info.run_dir)
ji = job_controller.get_job_info(db_id="1")
assert ji.state == JobState.READY
assert ji.remote.cleanup
assert ji.remote.prerun_cleanup

# set the job back to completed and set remote.cleanup=False. rerun with --no-delete
assert job_controller.set_job_state(JobState.COMPLETED, db_id="1")
assert job_controller.set_job_doc_properties({"remote.cleanup": False}, db_id="1")
assert job_controller.set_job_doc_properties(
{"remote.prerun_cleanup": False}, db_id="1"
)
run_check_cli(
["job", "rerun", "-did", "1", "-f", "-nd"],
)

ji = job_controller.get_job_info(db_id="1")
assert ji.state == JobState.READY
assert not ji.remote.cleanup
assert not ji.remote.prerun_cleanup


def test_retry(job_controller, two_flows_four_jobs) -> None:
10 changes: 5 additions & 5 deletions tests/db/jobs/test_jobcontroller.py
Original file line number Diff line number Diff line change
@@ -330,8 +330,8 @@ def test_rerun_failed(job_controller, runner) -> None:
j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index)
assert j1_path.exists()
assert j3_path.exists()
assert j1_info.remote.cleanup
assert j3_info.remote.cleanup
assert j1_info.remote.prerun_cleanup
assert j3_info.remote.prerun_cleanup

# create a file in the folders, to be sure that in the meanwhile the folders have been removed
# during the upload phase
@@ -355,8 +355,8 @@ def test_rerun_failed(job_controller, runner) -> None:
j3_info = job_controller.get_job_info(job_id=j3.uuid, job_index=j3.index)

# also check that the cleanup has be set to False
assert not j1_info.remote.cleanup
assert not j3_info.remote.cleanup
assert not j1_info.remote.prerun_cleanup
assert not j3_info.remote.prerun_cleanup

assert job_controller.count_jobs(job_ids=(j4.uuid, 2)) == 1

@@ -431,7 +431,7 @@ def submit_error(self, lock) -> NoReturn:
# do not run the job explicitly, as it is already checked in other tests
j1_info = job_controller.get_job_info(job_id=j1.uuid, job_index=j1.index)
assert Path(j1_info.run_dir).exists()
assert j1_info.remote.cleanup
assert j1_info.remote.prerun_cleanup


def test_retry(job_controller, monkeypatch, runner) -> None: