diff --git a/nvflare/apis/impl/job_def_manager.py b/nvflare/apis/impl/job_def_manager.py index e70e065d7ed..1c38b85e34f 100644 --- a/nvflare/apis/impl/job_def_manager.py +++ b/nvflare/apis/impl/job_def_manager.py @@ -24,6 +24,7 @@ from nvflare.apis.fl_context import FLContext from nvflare.apis.job_def import Job, JobMetaKey, job_from_meta from nvflare.apis.job_def_manager_spec import JobDefManagerSpec, RunStatus +from nvflare.apis.server_engine_spec import ServerEngineSpec from nvflare.apis.storage import StorageSpec from nvflare.apis.study_manager_spec import StudyManagerSpec from nvflare.fuel.hci.zip_utils import unzip_all_from_bytes, zip_directory_to_bytes @@ -62,6 +63,10 @@ def __init__(self, reviewer_name, fl_ctx: FLContext): self.reviewer_name = reviewer_name engine = fl_ctx.get_engine() self.study_manager = engine.get_component("study_manager") + if not isinstance(self.study_manager, StudyManagerSpec): + raise TypeError( + f"engine should have a study manager component of type StudyManagerSpec, but got {type(self.study_manager)}" + ) def filter_job(self, meta: dict): study = self.study_manager.get_study(meta[JobMetaKey.STUDY_NAME]) @@ -77,6 +82,9 @@ def filter_job(self, meta: dict): return True +# TODO:: use try block around storage calls + + class SimpleJobDefManager(JobDefManagerSpec): def __init__(self, uri_root: str = "jobs", job_store_id: str = "job_store", temp_dir: str = "/tmp"): super().__init__() @@ -86,6 +94,15 @@ def __init__(self, uri_root: str = "jobs", job_store_id: str = "job_store", temp raise ValueError("temp_dir {} is not a valid dir".format(temp_dir)) self.temp_dir = temp_dir + def _get_job_store(self, fl_ctx): + engine = fl_ctx.get_engine() + if not isinstance(engine, ServerEngineSpec): + raise TypeError(f"engine should be of type ServerEngineSpec, but got {type(engine)}") + store = engine.get_component(self.job_store_id) + if not isinstance(store, StorageSpec): + raise TypeError(f"engine should have a job store component of type StorageSpec, but got {type(store)}") + return store + def job_uri(self, jid: str): return os.path.join(self.uri_root, jid) @@ -102,14 +119,12 @@ def create(self, meta: dict, uploaded_content: bytes, fl_ctx: FLContext) -> Dict meta[JobMetaKey.STATUS] = RunStatus.SUBMITTED # write it to the store - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) store.create_object(self.job_uri(jid), uploaded_content, meta, overwrite_existing=True) return meta def delete(self, jid: str, fl_ctx: FLContext): - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) store.delete_object(self.job_uri(jid)) def _validate_meta(self, meta): @@ -136,14 +151,12 @@ def _validate_uploaded_content(self, uploaded_content) -> bool: pass def get_job(self, jid: str, fl_ctx: FLContext) -> Job: - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) job_meta = store.get_meta(self.job_uri(jid)) return job_from_meta(job_meta) def set_results_uri(self, jid: str, result_uri: str, fl_ctx: FLContext): - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) updated_meta = {JobMetaKey.RESULT_LOCATION: result_uri} store.update_meta(self.job_uri(jid), updated_meta, replace=False) return self.get_job(jid, fl_ctx) @@ -160,8 +173,7 @@ def get_apps(self, job: Job, fl_ctx: FLContext) -> Dict[str, bytes]: return result_dict def _load_job_data_from_store(self, jid: str, fl_ctx: FLContext): - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) data_bytes = store.get_data(self.job_uri(jid)) job_id_dir = os.path.join(self.temp_dir, jid) if os.path.exists(job_id_dir): @@ -171,36 +183,32 @@ def _load_job_data_from_store(self, jid: str, fl_ctx: FLContext): return job_id_dir def get_content(self, jid: str, fl_ctx: FLContext) -> bytes: - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) return store.get_data(self.job_uri(jid)) def set_status(self, jid: str, status: RunStatus, fl_ctx: FLContext): meta = {JobMetaKey.STATUS: status} - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) store.update_meta(uri=self.job_uri(jid), meta=meta, replace=False) def update_meta(self, jid: str, meta, fl_ctx: FLContext): - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) store.update_meta(uri=self.job_uri(jid), meta=meta, replace=False) - def list_all(self, fl_ctx: FLContext) -> List[Job]: + def get_all_jobs(self, fl_ctx: FLContext) -> List[Job]: job_filter = _AllJobsFilter() self._scan(job_filter, fl_ctx) return job_filter.result def _scan(self, job_filter: _JobFilter, fl_ctx: FLContext): - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) jid_paths = store.list_objects(self.uri_root) if not jid_paths: return for jid_path in jid_paths: jid = pathlib.PurePath(jid_path).name - meta = self.get_job(jid, fl_ctx).meta + meta = store.get_meta(self.job_uri(jid)) if meta: ok = job_filter.filter_job(meta) if not ok: @@ -211,7 +219,7 @@ def get_jobs_by_status(self, status, fl_ctx: FLContext) -> List[Job]: self._scan(job_filter, fl_ctx) return job_filter.result - def get_jobs_waiting_for_review(self, reviewer_name: str, fl_ctx: FLContext) -> List[Dict[str, Any]]: + def get_jobs_waiting_for_review(self, reviewer_name: str, fl_ctx: FLContext) -> List[Job]: job_filter = _ReviewerFilter(reviewer_name, fl_ctx) self._scan(job_filter, fl_ctx) return job_filter.result @@ -227,7 +235,6 @@ def set_approval( meta[JobMetaKey.APPROVALS] = approvals approvals[reviewer_name] = (approved, note) updated_meta = {JobMetaKey.APPROVALS: approvals} - engine = fl_ctx.get_engine() - store = engine.get_component(self.job_store_id) + store = self._get_job_store(fl_ctx) store.update_meta(self.job_uri(jid), updated_meta, replace=False) return meta diff --git a/nvflare/apis/job_def.py b/nvflare/apis/job_def.py index 18caba27a95..7f871f6fd0b 100644 --- a/nvflare/apis/job_def.py +++ b/nvflare/apis/job_def.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy from enum import Enum -from typing import Dict, List +from typing import Dict, List, Optional from nvflare.apis.fl_context import FLContext from nvflare.apis.job_def_manager_spec import JobDefManagerSpec @@ -49,24 +48,36 @@ def __repr__(self): class Job: - """Job object containing the job metadata. - - Args: - job_id: Job ID - study_name: Study name - resource_spec: Resource specification with information on the resources of each client - deploy_map: Deploy map specifying each app and the sites that it should be deployed to - meta: full contents of the persisted metadata for the job for persistent storage - """ - - def __init__(self, job_id, study_name, resource_spec, deploy_map, meta): + def __init__( + self, + job_id: str, + study_name: str, + resource_spec: Dict[str, Dict], + deploy_map: Dict[str, List[str]], + meta, + min_sites: int = 1, + required_sites: Optional[List[str]] = None, + ): + """Job object containing the job metadata. + + Args: + job_id: Job ID + study_name: Study name + resource_spec: Resource specification with information on the resources of each client + deploy_map: Deploy map specifying each app and the sites that it should be deployed to + meta: full contents of the persisted metadata for the job for persistent storage + min_sites (int): minimum number of sites + required_sites: A list of required site names + """ self.job_id = job_id self.study = study_name - # self.num_clients = num_clients # some way to specify minimum clients needed sites - self.resource_spec = resource_spec # resource_requirements should be {client name: resource} + self.resource_spec = resource_spec # resource_requirements should be {site name: resource} self.deploy_map = deploy_map # should be {app name: a list of sites} self.meta = meta + self.min_sites = min_sites + self.required_sites = required_sites + self.dispatcher_id = None self.dispatch_time = None @@ -92,8 +103,8 @@ def get_deployment(self) -> Dict[str, List[str]]: ] }, - Returns: contents of deploy_map as a dictionary of strings of app names with their corresponding sites - + Returns: + Contents of deploy_map as a dictionary of strings of app names with their corresponding sites """ return self.deploy_map @@ -104,7 +115,7 @@ def get_application(self, participant, fl_ctx: FLContext) -> bytes: job_def_manager = engine.get_component("job_manager") if not isinstance(job_def_manager, JobDefManagerSpec): raise TypeError(f"job_def_manager must be JobDefManagerSpec type. Got: {type(job_def_manager)}") - return job_def_manager.get_app(self, application_name) + return job_def_manager.get_app(self, application_name, fl_ctx) def get_application_name(self, participant): """Get the application name for the specified participant.""" @@ -115,7 +126,11 @@ def get_application_name(self, participant): return None def get_resource_requirements(self): - """Return app resource requirements.""" + """Returns app resource requirements. + + Returns: + A dict of {site_name: resource} + """ return self.resource_spec def __eq__(self, other): @@ -123,14 +138,13 @@ def __eq__(self, other): def job_from_meta(meta: dict) -> Job: - """Convert information in meta into a Job object. + """Converts information in meta into a Job object. Args: meta: dict of meta information Returns: - Job object. - + A Job object. """ job = Job( meta.get(JobMetaKey.JOB_ID), @@ -140,18 +154,3 @@ def job_from_meta(meta: dict) -> Job: meta, ) return job - - -def get_site_require_resource_from_job(job: Job): - """Get the total resource needed by each site to run this Job.""" - required_resources = job.get_resource_requirements() - deployment = job.get_deployment() - - total_required_resources = {} # {site name: total resources} - for app in required_resources: - for site_name in deployment[app]: - if site_name not in total_required_resources: - total_required_resources[site_name] = copy.deepcopy(required_resources[app]) - else: - total_required_resources[site_name] = total_required_resources[site_name] + required_resources[app] - return total_required_resources diff --git a/nvflare/apis/job_def_manager_spec.py b/nvflare/apis/job_def_manager_spec.py index bb9807cdc1a..f9f281ff30d 100644 --- a/nvflare/apis/job_def_manager_spec.py +++ b/nvflare/apis/job_def_manager_spec.py @@ -13,7 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod -from typing import Dict, List +from typing import Any, Dict, List from nvflare.apis.fl_component import FLComponent from nvflare.apis.fl_context import FLContext @@ -24,7 +24,7 @@ class JobDefManagerSpec(FLComponent, ABC): """Job Definition Management API.""" @abstractmethod - def create(self, meta: dict, uploaded_content: bytes, fl_ctx: FLContext) -> Job: + def create(self, meta: dict, uploaded_content: bytes, fl_ctx: FLContext) -> Dict[str, Any]: """Create a new job permanently. The caller must have validated the content already and created initial meta. Receives bytes of uploaded folder, @@ -35,22 +35,22 @@ def create(self, meta: dict, uploaded_content: bytes, fl_ctx: FLContext) -> Job: uploaded_content: data of the job definition fl_ctx (FLContext): FLContext information - Returns: a dict containing meta info. Additional meta info are added, especially - a unique Job ID (jid) which has been created. - + Returns: + A dict containing meta info. Additional meta info are added, especially + a unique Job ID (jid) which has been created. """ pass @abstractmethod def get_job(self, jid: str, fl_ctx: FLContext) -> Job: - """Get the Job object through the job ID. + """Gets the Job object through the job ID. Args: jid (str): Job ID fl_ctx (FLContext): FLContext information - Returns: Job object - + Returns: + A Job object """ pass @@ -63,8 +63,8 @@ def get_app(self, job: Job, app_name: str, fl_ctx: FLContext) -> bytes: app_name: name of the app to get fl_ctx (FLContext): FLContext information - Returns: content of the specified app in bytes - + Returns: + Content of the specified app in bytes """ pass @@ -76,8 +76,8 @@ def get_apps(self, job: Job, fl_ctx: FLContext) -> Dict[str, bytes]: job: Job object fl_ctx (FLContext): FLContext information - Returns: dictionary of app names with the content of the corresponding app encoded in bytes - + Returns: + A dictionary of app names with the content of the corresponding app encoded in bytes """ pass @@ -89,8 +89,8 @@ def get_content(self, jid: str, fl_ctx: FLContext) -> bytes: jid (str): Job ID fl_ctx (FLContext): FLContext information - Returns: uploaded content of the job in bytes - + Returns: + Uploaded content of the job in bytes """ pass @@ -119,38 +119,59 @@ def set_status(self, jid: str, status: RunStatus, fl_ctx: FLContext): pass @abstractmethod - def list_all(self, fl_ctx: FLContext) -> List[Job]: - """Return a list of all Jobs in the system. + def get_all_jobs(self, fl_ctx: FLContext) -> List[Job]: + """Gets all Jobs in the system. Args: fl_ctx (FLContext): FLContext information - Returns: list of all jobs as Job objects - + Returns: + A list of all jobs """ pass @abstractmethod def get_jobs_by_status(self, run_status: RunStatus, fl_ctx: FLContext) -> List[Job]: - """Return a list of Jobs of a specified status. + """Gets Jobs of a specified status. Args: run_status (RunStatus): status to filter for fl_ctx (FLContext): FLContext information - Returns: List of Jobs of the specified status. - + Returns: + A list of Jobs of the specified status """ pass @abstractmethod def get_jobs_waiting_for_review(self, reviewer_name: str, fl_ctx: FLContext) -> List[Job]: - """Return a list of Jobs waiting for review for the specified user.""" + """Gets Jobs waiting for review for the specified user. + + Args: + reviewer_name (str): reviewer name + fl_ctx (FLContext): FLContext information + + Returns: + A list of Jobs waiting for review for the specified user. + """ pass @abstractmethod - def set_approval(self, jid: str, reviewer_name: str, approved: bool, note: str, fl_ctx: FLContext) -> Job: - """Sets the approval for the specified user for a certain Job.""" + def set_approval( + self, jid: str, reviewer_name: str, approved: bool, note: str, fl_ctx: FLContext + ) -> Dict[str, Any]: + """Sets the approval for the specified user for a certain Job. + + Args: + jid (str): job id + reviewer_name (str): reviewer name + approved (bool): whether job is approved + note (str): any note message + fl_ctx (FLContext): FLContext information + + Returns: + A dictionary of Job metadata. + """ pass @abstractmethod diff --git a/nvflare/private/fed/server/job_cmds.py b/nvflare/private/fed/server/job_cmds.py index 86f2095addb..c2227480dcb 100644 --- a/nvflare/private/fed/server/job_cmds.py +++ b/nvflare/private/fed/server/job_cmds.py @@ -16,14 +16,16 @@ import logging from typing import List +from nvflare.apis.job_def_manager_spec import JobDefManagerSpec from nvflare.fuel.hci.conn import Connection from nvflare.fuel.hci.reg import CommandModule, CommandModuleSpec, CommandSpec +from nvflare.private.fed.server.server_engine import ServerEngine class JobCommandModule(CommandModule): """Command module with commands for job management.""" - def __init__(self): # , job_def_manager: JobDefManagerSpec): + def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) def get_spec(self): @@ -65,16 +67,29 @@ def get_spec(self): def list_all_jobs(self, conn: Connection, args: List[str]): engine = conn.app_ctx - jobs = engine.job_def_manager.list_all() - if jobs: - conn.append_string("Jobs:") - for job in jobs: - conn.append_string(job.job_id) - conn.append_string("\nJob details for each job:") - for job in jobs: - conn.append_string(json.dumps(job.meta, indent=4)) - else: - conn.append_string("No jobs.") + try: + if not isinstance(engine, ServerEngine): + raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}") + job_def_manager = engine.job_def_manager + if not isinstance(job_def_manager, JobDefManagerSpec): + raise TypeError( + f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}" + ) + + with engine.new_context() as fl_ctx: + jobs = job_def_manager.get_all_jobs(fl_ctx) + if jobs: + conn.append_string("Jobs:") + for job in jobs: + conn.append_string(job.job_id) + conn.append_string("\nJob details for each job:") + for job in jobs: + conn.append_string(json.dumps(job.meta, indent=4)) + else: + conn.append_string("No jobs.") + except Exception as e: + conn.append_error("exception occurred getting job details: " + str(e)) + return conn.append_success("") def get_job_details(self, conn: Connection, args: List[str]): @@ -83,9 +98,16 @@ def get_job_details(self, conn: Connection, args: List[str]): job_id = args[1] engine = conn.app_ctx try: - job = engine.job_def_manager.get_job(job_id) - job_meta = job.meta - conn.append_string(json.dumps(job_meta, indent=4)) + if not isinstance(engine, ServerEngine): + raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}") + job_def_manager = engine.job_def_manager + if not isinstance(job_def_manager, JobDefManagerSpec): + raise TypeError( + f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}" + ) + with engine.new_context() as fl_ctx: + job = job_def_manager.get_job(job_id, fl_ctx) + conn.append_string(json.dumps(job.meta, indent=4)) except Exception as e: conn.append_error("exception occurred getting job details: " + str(e)) return @@ -96,7 +118,15 @@ def delete_job(self, conn: Connection, args: List[str]): job_id = args[1] engine = conn.app_ctx try: - engine.job_def_manager.delete(job_id) + if not isinstance(engine, ServerEngine): + raise TypeError(f"engine is not of type ServerEngine, but got {type(engine)}") + job_def_manager = engine.job_def_manager + if not isinstance(job_def_manager, JobDefManagerSpec): + raise TypeError( + f"job_def_manager in engine is not of type JobDefManagerSpec, but got {type(job_def_manager)}" + ) + with engine.new_context() as fl_ctx: + job_def_manager.delete(job_id, fl_ctx) conn.append_string("Job {} deleted.".format(job_id)) except Exception as e: conn.append_error("exception occurred: " + str(e)) diff --git a/nvflare/private/fed/server/server_engine.py b/nvflare/private/fed/server/server_engine.py index 25c5365014d..47d2dbe8b52 100644 --- a/nvflare/private/fed/server/server_engine.py +++ b/nvflare/private/fed/server/server_engine.py @@ -36,26 +36,25 @@ MachineStatus, ReservedTopic, ReturnCode, + RunProcessKey, ServerCommandKey, ServerCommandNames, SnapshotKey, ) -from nvflare.apis.fl_constant import RunProcessKey from nvflare.apis.fl_context import FLContext -from nvflare.apis.fl_snapshot import RunSnapshot, FLSnapshot +from nvflare.apis.fl_snapshot import FLSnapshot, RunSnapshot from nvflare.apis.impl.job_def_manager import SimpleJobDefManager -from nvflare.apis.impl.study_manager import StudyManager from nvflare.apis.shareable import Shareable, make_reply from nvflare.apis.utils.common_utils import get_open_ports from nvflare.apis.utils.fl_context_utils import get_serializable_data from nvflare.apis.workspace import Workspace -from nvflare.app_common.storages.filesystem_storage import FilesystemStorage from nvflare.fuel.hci.zip_utils import zip_directory_to_bytes from nvflare.private.admin_defs import Message from nvflare.private.defs import RequestHeader, WorkspaceConstants from nvflare.private.fed.server.server_json_config import ServerJsonConfigurator from nvflare.widgets.info_collector import InfoCollector from nvflare.widgets.widget import Widget, WidgetID + from .client_manager import ClientManager from .run_manager import RunManager from .server_engine_internal_spec import EngineInfo, RunInfo, ServerEngineInternalSpec @@ -95,10 +94,8 @@ def __init__(self, server, args, client_manager: ClientManager, snapshot_persist self.lock = Lock() self.logger = logging.getLogger(self.__class__.__name__) - # self.job_def_manager = job_def_manager # todo: need to figure out how to initialize job manager with inputs - self.job_def_manager = SimpleJobDefManager( - StudyManager(FilesystemStorage()), FilesystemStorage(root_dir="/workspace/nvflare_provis") - ) + # TODO:: need to figure out how to initialize job manager with inputs + self.job_def_manager = SimpleJobDefManager() self.asked_to_stop = False self.snapshot_persistor = snapshot_persistor @@ -188,8 +185,7 @@ def start_app_on_server(self, run_number: str, snapshot=None) -> str: app_custom_folder = os.path.join(app_root, "custom") open_ports = get_open_ports(2) - self._start_runner_process(self.args, app_root, run_number, app_custom_folder, - open_ports, snapshot) + self._start_runner_process(self.args, app_root, run_number, app_custom_folder, open_ports, snapshot) threading.Thread(target=self._listen_command, args=(open_ports[0], run_number)).start() @@ -218,8 +214,9 @@ def _listen_command(self, listen_port, run_number): request = data.get("request") timeout = data.get("timeout") fl_ctx = data.get("fl_ctx") - replies = self.aux_send(targets=targets, topic=topic, request=request, - timeout=timeout, fl_ctx=fl_ctx) + replies = self.aux_send( + targets=targets, topic=topic, request=request, timeout=timeout, fl_ctx=fl_ctx + ) conn.send(replies) except: self.logger.warning("Failed to process the child process command.") @@ -253,13 +250,21 @@ def _start_runner_process(self, args, app_root, run_number, app_custom_folder, o for t in args.set: command_options += " " + t command = ( - sys.executable + " -m nvflare.private.fed.app.server.runner_process -m " - + args.workspace - + " -s fed_server.json -r " + app_root - + " -n " + str(run_number) - + " -p " + str(listen_port) - + " -c " + str(open_ports[0]) - + " --set" + command_options + " print_conf=True restore_snapshot=" + str(restore_snapshot) + sys.executable + + " -m nvflare.private.fed.app.server.runner_process -m " + + args.workspace + + " -s fed_server.json -r " + + app_root + + " -n " + + str(run_number) + + " -p " + + str(listen_port) + + " -c " + + str(open_ports[0]) + + " --set" + + command_options + + " print_conf=True restore_snapshot=" + + str(restore_snapshot) ) # use os.setsid to create new process group ID @@ -269,12 +274,13 @@ def _start_runner_process(self, args, app_root, run_number, app_custom_folder, o threading.Thread(target=self.wait_for_complete, args=[run_number]).start() with self.lock: - self.run_processes[run_number] = {RunProcessKey.LISTEN_PORT: listen_port, - RunProcessKey.CONNECTION: None, - RunProcessKey.CHILD_PROCESS: process, - # TODO: each run will have its own participants. Use all clients for now. - RunProcessKey.PARTICIPANTS: self.client_manager.clients - } + self.run_processes[run_number] = { + RunProcessKey.LISTEN_PORT: listen_port, + RunProcessKey.CONNECTION: None, + RunProcessKey.CHILD_PROCESS: process, + # TODO: each run will have its own participants. Use all clients for now. + RunProcessKey.PARTICIPANTS: self.client_manager.clients, + } return process def remove_custom_path(self): @@ -507,13 +513,16 @@ def send_aux_request(self, targets: [], topic: str, request: Shareable, timeout: def parent_aux_send(self, targets: [], topic: str, request: Shareable, timeout: float, fl_ctx: FLContext) -> dict: with self.parent_conn_lock: - data = {ServerCommandKey.COMMAND: ServerCommandNames.AUX_SEND, - ServerCommandKey.DATA: {"targets": targets, - "topic": topic, - "request": request, - "timeout": timeout, - "fl_ctx": get_serializable_data(fl_ctx)} - } + data = { + ServerCommandKey.COMMAND: ServerCommandNames.AUX_SEND, + ServerCommandKey.DATA: { + "targets": targets, + "topic": topic, + "request": request, + "timeout": timeout, + "fl_ctx": get_serializable_data(fl_ctx), + }, + } self.parent_conn.send(data) return_data = self.parent_conn.recv() return return_data @@ -611,8 +620,7 @@ def show_stats(self, run_number): with self.lock: command_conn = self.get_command_conn(run_number) if command_conn: - data = {ServerCommandKey.COMMAND: ServerCommandNames.SHOW_STATS, - ServerCommandKey.DATA: {}} + data = {ServerCommandKey.COMMAND: ServerCommandNames.SHOW_STATS, ServerCommandKey.DATA: {}} command_conn.send(data) stats = command_conn.recv() except: @@ -626,8 +634,7 @@ def get_errors(self, run_number): with self.lock: command_conn = self.get_command_conn(run_number) if command_conn: - data = {ServerCommandKey.COMMAND: ServerCommandNames.GET_ERRORS, - ServerCommandKey.DATA: {}} + data = {ServerCommandKey.COMMAND: ServerCommandNames.GET_ERRORS, ServerCommandKey.DATA: {}} command_conn.send(data) stats = command_conn.recv() except: