Skip to content

Commit

Permalink
fix upload and download job commands, add clone_job (NVIDIA#397)
Browse files Browse the repository at this point in the history
* fix upload and download job commands, add clone_job

* remove reference to private from fuel
  • Loading branch information
nvkevlu authored Apr 7, 2022
1 parent 2f694e3 commit 8ee1a33
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
43 changes: 32 additions & 11 deletions nvflare/fuel/hci/server/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import nvflare.fuel.hci.file_transfer_defs as ftd
from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec
from nvflare.fuel.hci.base64_utils import (
b64str_to_binary_file,
b64str_to_bytes,
Expand Down Expand Up @@ -256,10 +257,21 @@ def upload_job(self, conn: Connection, args: List[str]):
data_bytes = b64str_to_bytes(zip_b64str)
meta = json.loads(b64str_to_bytes(meta_b64str))
engine = conn.app_ctx
meta = engine.job_def_manager.create(meta, data_bytes)
conn.set_prop("meta", meta)
conn.set_prop("upload_job_id", meta.get(JobMetaKey.JOB_ID))
conn.append_string("Uploaded job {}".format(meta.get(JobMetaKey.JOB_ID)))
try:
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
meta = job_def_manager.create(meta, data_bytes, fl_ctx)
conn.set_prop("meta", meta)
conn.set_prop("upload_job_id", meta.get(JobMetaKey.JOB_ID))
conn.append_string("Uploaded job {}".format(meta.get(JobMetaKey.JOB_ID)))
except Exception as e:
conn.append_error("Exception occurred trying to upload job: " + str(e))
return
conn.append_success("")

def download_job(self, conn: Connection, args: List[str]):
if len(args) != 2:
Expand All @@ -269,13 +281,22 @@ def download_job(self, conn: Connection, args: List[str]):
job_id = args[1]

engine = conn.app_ctx
data_bytes = engine.job_def_manager.get_content(job_id)
job_id_dir = os.path.join(self.download_dir, job_id)
if os.path.exists(job_id_dir):
shutil.rmtree(job_id_dir)
os.mkdir(job_id_dir)
unzip_all_from_bytes(data_bytes, job_id_dir)

try:
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
data_bytes = job_def_manager.get_content(job_id, fl_ctx)
job_id_dir = os.path.join(self.download_dir, job_id)
if os.path.exists(job_id_dir):
shutil.rmtree(job_id_dir)
os.mkdir(job_id_dir)
unzip_all_from_bytes(data_bytes, job_id_dir)
except Exception as e:
conn.append_error("Exception occurred trying to get job from store: " + str(e))
return
try:
data = zip_directory_to_bytes(self.download_dir, job_id)
b64str = bytes_to_b64str(data)
Expand Down
25 changes: 24 additions & 1 deletion nvflare/private/fed/server/job_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
from typing import List

from nvflare.apis.job_def import JobMetaKey
from nvflare.apis.job_def_manager_spec import JobDefManagerSpec
from nvflare.fuel.hci.conn import Connection
from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec
Expand Down Expand Up @@ -137,4 +138,26 @@ def abort_job(self, conn: Connection, args: List[str]):
pass

def clone_job(self, conn: Connection, args: List[str]):
pass
if len(args) != 2:
conn.append_error("syntax error: usage: clone_job job_id")
job_id = args[1]
engine = conn.app_ctx
try:
if not isinstance(engine, ServerEngine):
raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}")
job_def_manager = engine.job_def_manager
if not isinstance(job_def_manager, JobDefManagerSpec):
raise TypeError(
f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}"
)
with engine.new_context() as fl_ctx:
job = job_def_manager.get_job(job_id, fl_ctx)
data_bytes = job_def_manager.get_content(job_id, fl_ctx)
meta = job_def_manager.create(job.meta, data_bytes, fl_ctx)
conn.set_prop("meta", meta)
conn.set_prop("upload_job_id", meta.get(JobMetaKey.JOB_ID))
conn.append_string("Cloned job {} as {}".format(job_id, meta.get(JobMetaKey.JOB_ID)))
except Exception as e:
conn.append_error("Exception occurred trying to clone job: " + str(e))
return
conn.append_success("")

0 comments on commit 8ee1a33

Please sign in to comment.