Skip to content

Commit

Permalink
Merge pull request #2094 from FedML-AI/alexleung/dev_branch_latest
Browse files Browse the repository at this point in the history
[CoreEngine] make the deployment status more stable.
  • Loading branch information
fedml-alex authored May 14, 2024
2 parents 11fd137 + 3417f30 commit c05f23a
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 46 deletions.
6 changes: 3 additions & 3 deletions python/fedml/api/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import fedml

# Login
fedml.set_env_version("local")
fedml.set_env_version("test")
fedml.set_local_on_premise_platform_port(18080)
error_code, error_msg = fedml.api.fedml_login(api_key="1316b93c82da40ce90113a2ed12f0b14")
if error_code != 0:
Expand All @@ -19,7 +19,7 @@

# Launch job
launch_result_list = list()
for i in range(0, 1):
for i in range(0, 10):
launch_result = fedml.api.launch_job(yaml_file)
launch_result_list.append(launch_result)
# launch_result = fedml.api.launch_job_on_cluster(yaml_file, "alex-cluster")
Expand All @@ -33,7 +33,7 @@
if log_result is None or log_result.run_status is None:
print(f"Failed to get job status.")
#exit(1)
print(f"Run status {log_result.run_status}")
print(f"Run {launch_result.run_id}, status {log_result.run_status}")
time.sleep(0.5)

# Get job logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ def send_deployment_results(self, end_point_id, end_point_name,
model_name, model_inference_url,
model_version, inference_port, inference_engine,
model_metadata, model_config, input_json, output_json, replica_id_list=None):
deployment_results_topic_prefix = "model_ops/model_device/return_deployment_result"
deployment_results_topic = "{}/{}".format(deployment_results_topic_prefix, end_point_id)
deployment_results_topic = "model_ops/model_device/return_deployment_result"
deployment_results_payload = {"end_point_id": end_point_id, "end_point_name": end_point_name,
"model_name": model_name, "model_url": model_inference_url,
"version": model_version, "port": inference_port,
Expand All @@ -48,15 +47,13 @@ def send_deployment_results(self, end_point_id, end_point_name,
logging.info(f"[Master] deployment_results_payload is sent to mlops: {deployment_results_payload}")

self.message_center.send_message_json(deployment_results_topic, json.dumps(deployment_results_payload))
self.message_center.send_message_json(deployment_results_topic_prefix, json.dumps(deployment_results_payload))

@staticmethod
def send_deployment_status(
end_point_id, end_point_name, model_name, model_inference_url, model_status, message_center=None):
if message_center is None:
return
deployment_status_topic_prefix = "model_ops/model_device/return_deployment_status"
deployment_status_topic = "{}/{}".format(deployment_status_topic_prefix, end_point_id)
deployment_status_topic = "model_ops/model_device/return_deployment_status"
deployment_status_payload = {"end_point_id": end_point_id, "end_point_name": end_point_name,
"model_name": model_name,
"model_url": model_inference_url,
Expand All @@ -65,16 +62,14 @@ def send_deployment_status(
logging.info(f"[Master] deployment_status_payload is sent to mlops: {deployment_status_payload}")

message_center.send_message_json(deployment_status_topic, json.dumps(deployment_status_payload))
message_center.send_message_json(deployment_status_topic_prefix, json.dumps(deployment_status_payload))

@staticmethod
def send_deployment_stages(end_point_id, model_name, model_id, model_inference_url,
model_stages_index, model_stages_title, model_stage_detail,
message_center=None):
if message_center is None:
return
deployment_stages_topic_prefix = "model_ops/model_device/return_deployment_stages"
deployment_stages_topic = "{}/{}".format(deployment_stages_topic_prefix, end_point_id)
deployment_stages_topic = "model_ops/model_device/return_deployment_stages"
deployment_stages_payload = {"model_name": model_name,
"model_id": model_id,
"model_url": model_inference_url,
Expand All @@ -85,7 +80,6 @@ def send_deployment_stages(end_point_id, model_name, model_id, model_inference_u
"timestamp": int(format(time.time_ns() / 1000.0, '.0f'))}

message_center.send_message_json(deployment_stages_topic, json.dumps(deployment_stages_payload))
message_center.send_message_json(deployment_stages_topic_prefix, json.dumps(deployment_stages_payload))

logging.info(f"-------- Stages has been sent to mlops with stage {model_stages_index} and "
f"payload {deployment_stages_payload}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, args, run_id=0, request_json=None, agent_config=None, edge_id
agent_log_file_dir=ServerConstants.get_log_file_dir()
)

self.is_deployment_runner = True
self.infer_host = "127.0.0.1"
self.redis_addr = "local"
self.redis_port = "6379"
Expand Down Expand Up @@ -306,7 +307,7 @@ def process_deployment_result_message(self, topic=None, payload=None):
return
else:
# This is the last worker that failed, so we should continue to "ABORTED" status
model_config_parameters = self.running_request_json[run_id_str]["parameters"]
model_config_parameters = self.request_json["parameters"]
inference_port = model_config_parameters.get("server_internal_port",
ServerConstants.MODEL_INFERENCE_DEFAULT_PORT)
inference_port_external = model_config_parameters.get("server_external_port", inference_port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, args, agent_config=None):
FedMLBaseMasterProtocolManager.__init__(self, args, agent_config=agent_config)

self.message_center_name = "deploy_master_agent"
self.is_deployment_status_center = True

self.topic_start_deployment = None
self.topic_activate_endpoint = None
Expand Down Expand Up @@ -215,18 +216,6 @@ def callback_start_deployment(self, topic, payload):

self.subscribe_deployment_messages_from_slave_devices(request_json)

# Report stage to mlops: MODEL_DEPLOYMENT_STAGE1 = "Received"
FedMLDeployJobRunnerManager.get_instance().send_deployment_stages(
run_id, model_name, model_id, "", ServerConstants.MODEL_DEPLOYMENT_STAGE1["index"],
ServerConstants.MODEL_DEPLOYMENT_STAGE1["text"], "Received request for endpoint {}".format(run_id),
message_center=self.message_center)

# Report stage to mlops: MODEL_DEPLOYMENT_STAGE2 = "Initializing"
FedMLDeployJobRunnerManager.get_instance().send_deployment_stages(
run_id, model_name, model_id, "", ServerConstants.MODEL_DEPLOYMENT_STAGE2["index"],
ServerConstants.MODEL_DEPLOYMENT_STAGE2["text"], ServerConstants.MODEL_DEPLOYMENT_STAGE2["text"],
message_center=self.message_center)

ServerConstants.save_runner_infos(self.args.device_id + "." + self.args.os_name, self.edge_id, run_id=run_id)

# Num diff
Expand Down Expand Up @@ -260,6 +249,18 @@ def callback_start_deployment(self, topic, payload):
if process is not None:
ServerConstants.save_run_process(run_id, process.pid)

# Report stage to mlops: MODEL_DEPLOYMENT_STAGE1 = "Received"
FedMLDeployJobRunnerManager.get_instance().send_deployment_stages(
run_id, model_name, model_id, "", ServerConstants.MODEL_DEPLOYMENT_STAGE1["index"],
ServerConstants.MODEL_DEPLOYMENT_STAGE1["text"], "Received request for endpoint {}".format(run_id),
message_center=self.message_center)

# Report stage to mlops: MODEL_DEPLOYMENT_STAGE2 = "Initializing"
FedMLDeployJobRunnerManager.get_instance().send_deployment_stages(
run_id, model_name, model_id, "", ServerConstants.MODEL_DEPLOYMENT_STAGE2["index"],
ServerConstants.MODEL_DEPLOYMENT_STAGE2["text"], ServerConstants.MODEL_DEPLOYMENT_STAGE2["text"],
message_center=self.message_center)

# Send stage: MODEL_DEPLOYMENT_STAGE3 = "StartRunner"
FedMLDeployJobRunnerManager.get_instance().send_deployment_stages(
run_id, model_name, model_id, "", ServerConstants.MODEL_DEPLOYMENT_STAGE3["index"],
Expand Down Expand Up @@ -328,6 +329,8 @@ def subscribe_deployment_messages_from_slave_devices(self, request_json):

logging.info("subscribe device messages {}".format(deployment_results_topic))

self.setup_listeners_for_edge_status(run_id, edge_id_list, self.edge_id)

def subscribe_spec_device_message(self, run_id, device_id):
if device_id == self.edge_id:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, args, run_id=0, request_json=None, agent_config=None, edge_id
agent_log_file_dir=ClientConstants.get_log_file_dir()
)

self.is_deployment_runner = True
self.infer_host = "127.0.0.1"
self.redis_addr = "local"
self.redis_port = "6379"
Expand Down Expand Up @@ -286,11 +287,8 @@ def run_impl(self, run_extend_queue_list, sender_message_center,
inference_engine, model_metadata, model_config)

self.status_reporter.run_id = self.run_id
self.status_reporter.report_client_id_status(
self.edge_id, ClientConstants.MSG_MLOPS_CLIENT_STATUS_FAILED,
is_from_model=True, run_id=self.run_id)

return False
raise Exception("[Worker] Failed to deploy the model.")
else:
# Send failed successful result back to master
logging.info("Finished deployment, continue to send results to master...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, args, agent_config=None):
FedMLBaseSlaveProtocolManager.__init__(self, args, agent_config=agent_config)

self.message_center_name = "deploy_slave_agent"
self.is_deployment_status_center = True

self.topic_start_deployment = None
self.topic_delete_deployment = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ class GeneralConstants:
MSG_MLOPS_SERVER_STATUS_FINISHED = "FINISHED"
MSG_MLOPS_SERVER_STATUS_EXCEPTION = "EXCEPTION"

MSG_MODELOPS_DEPLOYMENT_STATUS_INITIALIZING = "INITIALIZING"
MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYING = "DEPLOYING"
MSG_MODELOPS_DEPLOYMENT_STATUS_INFERRING = "INFERRING"
MSG_MODELOPS_DEPLOYMENT_STATUS_OVERLOAD = "OVERLOAD"
MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED = "FAILED"
MSG_MODELOPS_DEPLOYMENT_STATUS_RESCALING = "RESCALING"
MSG_MODELOPS_DEPLOYMENT_STATUS_UPDATING = "UPDATING"
MSG_MODELOPS_DEPLOYMENT_STATUS_UPDATING_FAILED = "UPDATING_FAILED"
MSG_MODELOPS_DEPLOYMENT_STATUS_ABORTING = "ABORTING"
MSG_MODELOPS_DEPLOYMENT_STATUS_ABORTED = "ABORTED"
MSG_MODELOPS_DEPLOYMENT_STATUS_DEPLOYED = "DEPLOYED"
MSG_MODELOPS_DEPLOYMENT_STATUS_KILLED = "KILLED"

MASTER_LOGIN_PROGRAM = "server_login.py"
SLAVE_LOGIN_PROGRAM = "client_login.py"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import logging
import multiprocessing
import os
import platform
import random
import shutil
import time
import traceback
import zipfile
import queue
from ..comm_utils.constants import SchedulerConstants
from ..comm_utils.job_utils import JobRunnerUtils, DockerArgs
from ..scheduler_entry.constants import Constants
Expand Down Expand Up @@ -82,8 +84,7 @@ def __init__(self, args, edge_id=0, request_json=None, agent_config=None, run_id
"${FEDSYS.CLIENT_OBJECT_LIST}": "",
"${FEDSYS.LOG_SERVER_URL}": "",
}
self.download_time = time.time()
self.download_finished = False
self.is_deployment_runner = False

def __repr__(self):
return "<{klass} @{id:x} {attrs}>".format(
Expand Down Expand Up @@ -162,7 +163,7 @@ def package_download_progress(self, count, blksize, filesize):
self.prev_download_progress = progress_int
logging.info("package downloaded size {} KB, progress {}%".format(downloaded_kb, progress_int))

def download_package_proc(self, package_url, local_package_file):
def download_package_proc(self, package_url, local_package_file, completed_event, info_queue):
import requests
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}
Expand All @@ -188,8 +189,8 @@ def download_package_proc(self, package_url, local_package_file):
written_size = f.write(chunk)
total_size += written_size
logging.info("package downloaded size %.2f KB", total_size/1024)
self.download_time = time.time()
self.download_finished = True
info_queue.put(time.time())
completed_event.set()

def retrieve_and_unzip_package(self, package_name, package_url):
local_package_path = self.agent_package_download_dir
Expand All @@ -202,26 +203,43 @@ def retrieve_and_unzip_package(self, package_name, package_url):
ssl._create_default_https_context = ssl._create_unverified_context

# Open a process to download the package so that we can avoid the request is blocked and check the timeout.
self.download_finished = False
self.download_time = time.time()
from multiprocessing import Process
download_process = Process(target=self.download_package_proc, args=(package_url, local_package_file))
completed_event = multiprocessing.Event()
info_queue = multiprocessing.Queue()
download_process = Process(target=self.download_package_proc,
args=(package_url, local_package_file, completed_event, info_queue))
download_process.start()
allowed_block_download_time = 30
allowed_block_download_time = 60
download_finished = False
download_time = time.time()
while True:
block_time = time.time() - self.download_time
try:
queue_time = info_queue.get(block=False, timeout=3)
download_time = queue_time
except queue.Empty as e:
pass

block_time = time.time() - download_time
if block_time > allowed_block_download_time:
break
if self.download_finished:

if completed_event.is_set():
download_finished = True
break
time.sleep(3)
try:
if not self.download_finished:
if not download_finished:
download_process.terminate()
download_process.kill()
except Exception as e:
pass

if not download_finished:
raise Exception("Download timeout, please check if your network is stable.")

if not os.path.exists(local_package_file):
raise Exception(f"Failed to download, the zip file is not exist at {local_package_file}.")

# Another method to async download.
# import socket
# socket.setdefaulttimeout(15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def start_status_listener_center(self):

def rebuild_status_center(self, status_center_queue):
self.status_center = FedMLStatusCenter(message_queue=status_center_queue)
self.status_center.is_deployment_status_center = self.is_deployment_status_center

if self.status_reporter is None:
self.status_reporter = MLOpsMetrics()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(self, message_queue=None):
self.status_message_center = None
self.status_manager_instance = None
self.status_runner = None
self.is_deployment_status_center = False

def __repr__(self):
return "<{klass} @{id:x} {attrs}>".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import shutil
import time
from os import listdir

from ....core.mlops.mlops_runtime_log_daemon import MLOpsRuntimeLogDaemon
Expand Down Expand Up @@ -67,6 +68,9 @@ def process_job_completed_status(self, master_id, status):
# self.remove_listener_for_run_metrics(self.run_id)
# self.remove_listener_for_run_logs(self.run_id)

if self.status_center.is_deployment_status_center and status == ServerConstants.MSG_MLOPS_SERVER_STATUS_FAILED:
self.report_deployment_status(self.run_id, GeneralConstants.MSG_MODELOPS_DEPLOYMENT_STATUS_FAILED)

def process_job_exception_status(self, master_id, status):
# Send the exception status to slave devices.
self.report_exception_status(
Expand Down Expand Up @@ -302,3 +306,14 @@ def status_center_request_job_status_from_master_in_slave_agent(self, topic, pay
topic_request_job_status = f"{GeneralConstants.MSG_TOPIC_REQUEST_JOB_STATUS_PREFIX}{master_id}"
payload_request_job_status = {"run_id": run_id, "edge_id": edge_id}
self.message_center.send_message(topic_request_job_status, json.dumps(payload_request_job_status))

def report_deployment_status(self, run_id, status):
deployment_status_topic = "model_ops/model_device/return_deployment_status"
deployment_status_payload = {"end_point_id": run_id, "end_point_name": "",
"model_name": "",
"model_url": "",
"model_status": status,
"timestamp": int(format(time.time_ns() / 1000.0, '.0f'))}
logging.info(f"[StatusCenter] deployment_status_payload is sent to mlops: {deployment_status_payload}")

self.message_center.send_message_json(deployment_status_topic, json.dumps(deployment_status_payload))
Loading

0 comments on commit c05f23a

Please sign in to comment.