Skip to content

Commit

Permalink
Merge pull request #2081 from FedML-AI/raphael/fix-autoscale
Browse files Browse the repository at this point in the history
[Deploy] Handle Exception during autoscaling decision; Fix a compatibility issue.
  • Loading branch information
Raphael-Jin authored May 4, 2024
2 parents dfe8a27 + addd91c commit 4b38d38
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 84 deletions.
168 changes: 86 additions & 82 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,91 +104,95 @@ def autoscaler_reconcile_after_interval(self):
for endpoint_settings in endpoints_settings_list:
endpoint_state = endpoint_settings["state"]
if endpoint_state == "DEPLOYED" and endpoint_settings["enable_auto_scaling"]:
logging.info(f"After interval, check the autoscaler for async future list."
f"{self.endpoints_autoscale_predict_future}")
# TODO(fedml-dimitris): The policy can be set dynamically or be user specific.
# Set the policy, here we use latency, but other metrics are possible as well, such as qps.
# For more advanced use cases look for the testing scripts under the autoscaler/test directory.
autoscaling_policy_config = \
{
"current_replicas": int(endpoint_settings["replica_num"]),
"min_replicas": int(endpoint_settings["scale_min"]),
"max_replicas": int(endpoint_settings["scale_max"]),
"queries_per_replica": int(endpoint_settings["target_queries_per_replica"]),
"window_size_secs": int(endpoint_settings["aggregation_window_size_seconds"]),
"scaledown_delay_secs": int(endpoint_settings["scale_down_delay_seconds"]),
try: # Should not let one endpoint affect the others
logging.info(f"After interval, check the autoscaler for async future list."
f"{self.endpoints_autoscale_predict_future}")
# TODO(fedml-dimitris): The policy can be set dynamically or be user specific.
# Set the policy, here we use latency, but other metrics are possible as well, such as qps.
# For more advanced use cases look for the testing scripts under the autoscaler/test directory.
autoscaling_policy_config = \
{
"current_replicas": int(endpoint_settings["replica_num"]),
"min_replicas": int(endpoint_settings["scale_min"]),
"max_replicas": int(endpoint_settings["scale_max"]),
"queries_per_replica": int(endpoint_settings["target_queries_per_replica"]),
"window_size_secs": int(endpoint_settings["aggregation_window_size_seconds"]),
"scaledown_delay_secs": int(endpoint_settings["scale_down_delay_seconds"]),
}
autoscaling_policy = ConcurrentQueryPolicy(**autoscaling_policy_config)

e_id, e_name, model_name = endpoint_settings["endpoint_id"], endpoint_settings["endpoint_name"], \
endpoint_settings["model_name"]

logging.info(f"Querying the autoscaler for endpoint {e_id} with user settings {endpoint_settings}.")

# For every endpoint we just update the policy configuration.
autoscaling_policy.min_replicas = endpoint_settings["scale_min"]
autoscaling_policy.max_replicas = endpoint_settings["scale_max"]
# We retrieve a list of replicas for every endpoint. The number
# of running replicas is the length of that list.
current_replicas = len(fedml_model_cache.get_endpoint_replicas_results(e_id))
autoscaling_policy.current_replicas = current_replicas
logging.info(f"Endpoint {e_id} autoscaling policy: {autoscaling_policy}.")

scale_op = autoscaler.scale_operation_endpoint(
autoscaling_policy,
str(e_id))

new_replicas = current_replicas + scale_op.value

logging.info(f"Scaling operation {scale_op.value} for endpoint {e_id} .")
logging.info(f"New Replicas {new_replicas} for endpoint {e_id} .")
logging.info(f"Current Replicas {current_replicas} for endpoint {e_id} .")
if current_replicas == new_replicas:
# Basically the autoscaler decided that no scaling operation should take place.
logging.info(f"No scaling operation for endpoint {e_id}.")
return

# Should scale in / out
curr_version = fedml.get_env_version()

if curr_version == "release":
mlops_prefix = "https://open.fedml.ai/"
elif curr_version == "test":
mlops_prefix = "https://open-test.fedml.ai/"
else:
logging.error(f"Do not support the version {curr_version}.")
return
autoscale_url_path = "fedmlModelServer/api/v1/endpoint/auto-scale"
url = f"{mlops_prefix}{autoscale_url_path}"

# Get cached token for authorization of autoscale request
cached_token = fedml_model_cache.get_end_point_token(e_id, e_name, model_name)
if cached_token is None:
logging.error(f"Failed to get the cached token for endpoint {e_id}.")
return

req_header = {
"Authorization": f"Bearer {cached_token}"
}
req_body = {
"endpointId": int(e_id),
"replicasDesired": int(new_replicas)
}
autoscaling_policy = ConcurrentQueryPolicy(**autoscaling_policy_config)

e_id, e_name, model_name = endpoint_settings["endpoint_id"], endpoint_settings["endpoint_name"], \
endpoint_settings["model_name"]

logging.info(f"Querying the autoscaler for endpoint {e_id} with user settings {endpoint_settings}.")

# For every endpoint we just update the policy configuration.
autoscaling_policy.min_replicas = endpoint_settings["scale_min"]
autoscaling_policy.max_replicas = endpoint_settings["scale_max"]
# We retrieve a list of replicas for every endpoint. The number
# of running replicas is the length of that list.
current_replicas = len(fedml_model_cache.get_endpoint_replicas_results(e_id))
autoscaling_policy.current_replicas = current_replicas
logging.info(f"Endpoint {e_id} autoscaling policy: {autoscaling_policy}.")

scale_op = autoscaler.scale_operation_endpoint(
autoscaling_policy,
str(e_id))

new_replicas = current_replicas + scale_op.value

logging.info(f"Scaling operation {scale_op.value} for endpoint {e_id} .")
logging.info(f"New Replicas {new_replicas} for endpoint {e_id} .")
logging.info(f"Current Replicas {current_replicas} for endpoint {e_id} .")
if current_replicas == new_replicas:
# Basically the autoscaler decided that no scaling operation should take place.
logging.info(f"No scaling operation for endpoint {e_id}.")
return

# Should scale in / out
curr_version = fedml.get_env_version()

if curr_version == "release":
mlops_prefix = "https://open.fedml.ai/"
elif curr_version == "test":
mlops_prefix = "https://open-test.fedml.ai/"
else:
logging.error(f"Do not support the version {curr_version}.")
return
autoscale_url_path = "fedmlModelServer/api/v1/endpoint/auto-scale"
url = f"{mlops_prefix}{autoscale_url_path}"

# Get cached token for authorization of autoscale request
cached_token = fedml_model_cache.get_end_point_token(e_id, e_name, model_name)
if cached_token is None:
logging.error(f"Failed to get the cached token for endpoint {e_id}.")
return

req_header = {
"Authorization": f"Bearer {cached_token}"
}
req_body = {
"endpointId": int(e_id),
"replicasDesired": int(new_replicas)
}

try:
logging.info(f"Sending the autoscale request to MLOps platform. url {url}, "
f"body {req_body}., header {req_header}")
response = requests.post(
url,
headers=req_header,
json=req_body
)
if response.status_code != 200:
logging.error(f"Failed to send the autoscale request to MLOps platform.")
else:
logging.info(f"Successfully sent the autoscale request to MLOps platform.")
try:
logging.info(f"Sending the autoscale request to MLOps platform. url {url}, "
f"body {req_body}., header {req_header}")
response = requests.post(
url,
headers=req_header,
json=req_body
)
if response.status_code != 200:
logging.error(f"Failed to send the autoscale request to MLOps platform.")
else:
logging.info(f"Successfully sent the autoscale request to MLOps platform.")
except Exception as e:
logging.error(f"Failed to send the autoscale request to MLOps platform. {e}")
except Exception as e:
logging.error(f"Failed to send the autoscale request to MLOps platform. {e}")
logging.error(f"Error in autoscaler reconcile after interval. {e}")
pass
return

@staticmethod
Expand Down
3 changes: 1 addition & 2 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ def finalize_options(self):
'prettytable',
'py-machineid',
'pydantic',
'pydantic-settings',
'pytest',
'pytest-mock',
'python-rapidjson>=0.9.1',
'redis',
'scikit-learn',
'smart-open==6.3.0',
'spacy',
'spacy>=3.2.0,<3.3.0',
'sqlalchemy',
'toposort',
'torch>=1.13.1',
Expand Down

0 comments on commit 4b38d38

Please sign in to comment.