diff --git a/python/fedml/computing/scheduler/comm_utils/job_monitor.py b/python/fedml/computing/scheduler/comm_utils/job_monitor.py index 48b0146b53..32bae58bef 100644 --- a/python/fedml/computing/scheduler/comm_utils/job_monitor.py +++ b/python/fedml/computing/scheduler/comm_utils/job_monitor.py @@ -28,7 +28,8 @@ from .job_utils import JobRunnerUtils from ..model_scheduler.device_http_proxy_inference_protocol import FedMLHttpProxyInference from ..model_scheduler.device_model_cache import FedMLModelCache -from ..model_scheduler.autoscaler.autoscaler import Autoscaler, EWMPolicy, ConcurrentQueryPolicy +from ..model_scheduler.autoscaler.autoscaler import Autoscaler +from ..model_scheduler.autoscaler.policies import ConcurrentQueryPolicy from ..model_scheduler.device_model_db import FedMLModelDatabase from ..model_scheduler.device_mqtt_inference_protocol import FedMLMqttInference from ..slave import client_constants @@ -109,8 +110,14 @@ def autoscaler_reconcile_after_interval(self): # Set the policy, here we use latency, but other metrics are possible as well, such as qps. # For more advanced use cases look for the testing scripts under the autoscaler/test directory. autoscaling_policy_config = \ - {"queries_per_replica": endpoint_settings["target_queries_per_replica"], - "window_size_secs": endpoint_settings["aggregation_window_size_seconds"]} + { + "current_replicas": int(endpoint_settings["replica_num"]), + "min_replicas": int(endpoint_settings["scale_min"]), + "max_replicas": int(endpoint_settings["scale_max"]), + "queries_per_replica": int(endpoint_settings["target_queries_per_replica"]), + "window_size_secs": int(endpoint_settings["aggregation_window_size_seconds"]), + "scaledown_delay_secs": int(endpoint_settings["scale_down_delay_seconds"]), + } autoscaling_policy = ConcurrentQueryPolicy(**autoscaling_policy_config) e_id, e_name, model_name = endpoint_settings["endpoint_id"], endpoint_settings["endpoint_name"], \ diff --git a/python/fedml/computing/scheduler/model_scheduler/autoscaler/autoscaler.py b/python/fedml/computing/scheduler/model_scheduler/autoscaler/autoscaler.py index 3394e46bd7..cd33e61951 100644 --- a/python/fedml/computing/scheduler/model_scheduler/autoscaler/autoscaler.py +++ b/python/fedml/computing/scheduler/model_scheduler/autoscaler/autoscaler.py @@ -1,3 +1,4 @@ +import logging import math import time import warnings @@ -6,9 +7,8 @@ from enum import Enum from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache -from pydantic import BaseModel, field_validator +from fedml.computing.scheduler.model_scheduler.autoscaler.policies import * from utils.singleton import Singleton -from typing import Dict class ScaleOp(Enum): @@ -17,95 +17,6 @@ class ScaleOp(Enum): DOWN_IN_OP = -1 -class AutoscalingPolicy(BaseModel): - """ - Below are some default values for every endpoint. - - The following parameters refer to: - - current_replicas: the number of currently running replicas of the endpoint - - min_replicas: the minimum number of replicas of the endpoint in the instance group - - max_replicas: the maximum number of replicas of the endpoint in the instance group - - release_replica_after_idle_secs: when to release a single idle replica - - scaledown_delay_secs: how many seconds to wait before performing a scale down operation - - scaleup_cost_secs: how many seconds it takes/costs to perform a scale up operation - - last_triggering_value: the last value that triggered a scaling operation - - The `replica_idle_grace_secs` parameter is used as - the monitoring interval after which a running replica - of an idle endpoint should be released. - """ - current_replicas: int = 0 - min_replicas: int = 0 - max_replicas: int = 0 - release_replica_after_idle_secs: float = 300 - scaledown_delay_secs: float = 60 - scaleup_cost_secs: float = 300 - last_triggering_value: float = None - - -class EWMPolicy(AutoscalingPolicy): - """ - Configuration parameters for the reactive autoscaling policy. - EWM stands for Exponential Weighted Calculations, since we use - the pandas.DataFrame.ewm() functionality. - - For panda's EWM using alpha = 0.1, we indicate that the most recent - values are weighted more. The reason is that the exponential weighted - mean formula in pandas is computed as: - Yt = X_t + (1-a) * X_{t-1} + (1-a)^2 X_{t-2} / (1 + (1-a) + (1-a)^2) - - The following parameters refer to: - - ewm_mins: the length of the interval we consider for reactive decision - - ewm_alpha: the decay factor for the exponential weighted interval - - ewm_latest: the latest recorded value of the metric's exponential weighted mean - - ub_threshold: the upper bound scaling factor threshold for reactive decision - - lb_threshold: the lower bound scaling factor threshold for reactive decision - - Example: - - Let's say that we consider 15 minutes as the length of our interval and a - decay factor alpha with a value of 0.5: - Original Sequence: [0.1, 0.2, 0.4, 3, 5, 10] - EWM Sequence: [0.1, [0.166, 0.3, 1.74, 3.422, 6.763] - - If we assume that our previous scaling operation was triggered at value Y, - then the conditions we use to decide whether to scale up or down are: - Latency: - ScaleUP: X > ((1 + ub_threshold) * Y) - ScaleDown: X < (lb_threshold * Y) - QPS: - ScaleUP: X < (lb_threshold * Y) - ScaleDown: X < ((1 + ub_threshold) * Y) - - In other words, QPS is the inverse of Latency and vice versa. - """ - metric: str = "ewm_latency" - ewm_mins: int = 15 - ewm_alpha: float = 0.5 - ewm_latest: float = None - ub_threshold: float = 0.5 - lb_threshold: float = 0.5 - - @field_validator("metric") - def validate_option(cls, v): - assert v in ["ewm_latency", "ewm_qps"] - return v - - -class ConcurrentQueryPolicy(AutoscalingPolicy): - """ - This policy captures the number of queries we want to support - per replica over the defined window length in seconds. - """ - queries_per_replica: int = 1 - window_size_secs: int = 60 - - -class PredictivePolicy(AutoscalingPolicy): - # TODO(fedml-dimitris): TO BE COMPLETED! - pass - - class Autoscaler(metaclass=Singleton): def __init__(self, redis_addr="local", redis_port=6379, redis_password="fedml_default"): @@ -117,6 +28,11 @@ def __init__(self, redis_addr="local", redis_port=6379, redis_password="fedml_de def get_instance(*args, **kwargs): return Autoscaler(*args, **kwargs) + @classmethod + def get_current_timestamp_micro_seconds(cls): + # in REDIS we record/operate in micro-seconds, hence the division by 1e3! + return int(format(time.time_ns() / 1000.0, '.0f')) + @classmethod def scale_operation_predictive(cls, predictive_policy: PredictivePolicy, @@ -133,10 +49,13 @@ def scale_operation_ewm(cls, # Adding the context below to avoid having a series of warning messages. with warnings.catch_warnings(): warnings.simplefilter(action='ignore', category=FutureWarning) - short_period_data = metrics.last("{}min".format(ewm_policy.ewm_mins)) + period_data = metrics.last("{}min".format(ewm_policy.ewm_mins)) + # If the data frame window is empty then do nothing more, just return. + if period_data.empty: + return ScaleOp.NO_OP metric_name = "current_latency" \ if "ewm_latency" == ewm_policy.metric else "current_qps" - ewm_period = short_period_data[metric_name] \ + ewm_period = period_data[metric_name] \ .ewm(alpha=ewm_policy.ewm_alpha).mean() scale_op = ScaleOp.NO_OP @@ -150,15 +69,15 @@ def scale_operation_ewm(cls, ewm_policy.ewm_latest = latest_value # Assign the triggering value the first time we call the reactive # policy, if of course it has not been assigned already. - if ewm_policy.last_triggering_value is None: - ewm_policy.last_triggering_value = latest_value + if ewm_policy.previous_triggering_value is None: + ewm_policy.previous_triggering_value = latest_value - upper_bound = (1 + ewm_policy.ub_threshold) * ewm_policy.last_triggering_value - lower_bound = ewm_policy.lb_threshold * ewm_policy.last_triggering_value + upper_bound = (1 + ewm_policy.ub_threshold) * ewm_policy.previous_triggering_value + lower_bound = ewm_policy.lb_threshold * ewm_policy.previous_triggering_value if latest_value <= lower_bound or latest_value >= upper_bound: # Replace the triggering value if the policy requests so. - ewm_policy.last_triggering_value = latest_value + ewm_policy.previous_triggering_value = latest_value if ewm_policy.metric == "ewm_latency": # If the 'latency' is smaller than the @@ -191,33 +110,78 @@ def scale_operation_query_concurrency(cls, warnings.simplefilter(action='ignore', category=FutureWarning) # Here, the number of queries is the number of rows in the short period data frame. period_data = metrics.last("{}s".format(concurrent_query_policy.window_size_secs)) + # If the data frame window is empty then do nothing more, just return. + if period_data.empty: + return ScaleOp.NO_OP queries_num = period_data.shape[0] - # QSR: Queries per Second per Replica: (Number of Queries / Number of Current Replicas) / Window Size - # Comparing target QSR to current QSR. - target_qsr = \ - concurrent_query_policy.queries_per_replica / concurrent_query_policy.window_size_secs - # We need to floor the target queries per replica, therefore we need to ceil the division - # to ensure we will not have too much fluctuation. For instance, if the user requested to - # support 2 queries per replica per 60 seconds, the target QSR is 2/60 = 0.0333. - # Then, if we had 5 queries sent to 3 replicas in 60 seconds, the current QSR - # would be (5/3)/60 = 0.0277. To avoid the fluctuation, we need to round the incoming - # number of queries per replica to the nearest integer and then divide by the window size. - current_qsr = \ - (math.ceil(queries_num / concurrent_query_policy.current_replicas) / - concurrent_query_policy.window_size_secs) - - if current_qsr > target_qsr: - concurrent_query_policy.last_triggering_value = current_qsr + try: + # QSR: Queries per Second per Replica: (Number of Queries / Number of Current Replicas) / Window Size + # Comparing target QSR to current QSR. + target_qrs = \ + concurrent_query_policy.queries_per_replica / concurrent_query_policy.window_size_secs + # We need to floor the target queries per replica, therefore we need to ceil the division + # to ensure we will not have too much fluctuation. For instance, if the user requested to + # support 2 queries per replica per 60 seconds, the target QSR is 2/60 = 0.0333. + # Then, if we had 5 queries sent to 3 replicas in 60 seconds, the current QSR + # would be (5/3)/60 = 0.0277. To avoid the fluctuation, we need to round the incoming + # number of queries per replica to the nearest integer and then divide by the window size. + current_qrs = \ + (math.ceil(queries_num / concurrent_query_policy.current_replicas) / + concurrent_query_policy.window_size_secs) + except ZeroDivisionError as error: + logging.error("Division by zero.") + return ScaleOp.NO_OP + + if current_qrs > target_qrs: + concurrent_query_policy.previous_triggering_value = current_qrs scale_op = ScaleOp.UP_OUT_OP - elif current_qsr < target_qsr: - concurrent_query_policy.last_triggering_value = current_qsr + elif current_qrs < target_qrs: + concurrent_query_policy.previous_triggering_value = current_qrs scale_op = ScaleOp.DOWN_IN_OP else: scale_op = ScaleOp.NO_OP return scale_op + @classmethod + def scale_operation_meet_traffic_demand(cls, + meet_traffic_demand_policy: MeetTrafficDemandPolicy, + metrics: pd.DataFrame) -> ScaleOp: + + # Adding the context below to avoid having a series of warning messages. + with warnings.catch_warnings(): + warnings.simplefilter(action='ignore', category=FutureWarning) + # Here, the number of queries is the number of rows in the short period data frame. + period_data = metrics.last("{}s".format(meet_traffic_demand_policy.window_size_secs)) + # If the data frame window is empty then do nothing more, just return. + if period_data.empty: + return ScaleOp.NO_OP + + period_requests_num = period_data.shape[0] + all_latencies = metrics["current_latency"] + # Original value is milliseconds, convert to seconds. + average_latency = all_latencies.mean() / 1e3 + + try: + # RS: Requests_per_Second + rs = period_requests_num / meet_traffic_demand_policy.window_size_secs + # QS: Queries_per_Second + qs = 1 / average_latency + except ZeroDivisionError as error: + logging.error("Division by zero.") + return ScaleOp.NO_OP + + scale_op = ScaleOp.NO_OP + if rs > qs: + # Need to meet the demand. + scale_op = ScaleOp.UP_OUT_OP + elif rs < qs: + # Demand already met. + scale_op = ScaleOp.DOWN_IN_OP + + return scale_op + def run_autoscaling_policy(self, autoscaling_policy: AutoscalingPolicy, metrics: pd.DataFrame) -> ScaleOp: @@ -230,11 +194,16 @@ def run_autoscaling_policy(self, scale_op = self.scale_operation_query_concurrency( autoscaling_policy, metrics) + elif isinstance(autoscaling_policy, MeetTrafficDemandPolicy): + scale_op = self.scale_operation_meet_traffic_demand( + autoscaling_policy, + metrics) elif isinstance(autoscaling_policy, PredictivePolicy): scale_op = self.scale_operation_predictive( autoscaling_policy, metrics) else: + print(autoscaling_policy) raise RuntimeError("Not a valid autoscaling policy instance.") return scale_op @@ -252,6 +221,48 @@ def validate_scaling_bounds(cls, scale_op = ScaleOp.NO_OP return scale_op + def enforce_scaling_down_delay_interval(self, + endpoint_id, + autoscaling_policy: AutoscalingPolicy) -> ScaleOp: + """ + This function checks if scaling down delay seconds set by the policy + has been exceeded. To enforce the delay it uses REDIS to persist the + time of the scaling down operation. + + If such a record exists it fetches the previous scale down operation's timestamp + and compares the duration of the interval (delay). + + If the interval is exceeded then it triggers/allows the scaling operation to be + passed to the calling process, else it returns a no operation. + """ + + # If the policy has no scaledown delay then return immediately. + if autoscaling_policy.scaledown_delay_secs == 0: + return ScaleOp.DOWN_IN_OP + + # By default, we return a no operation. + scale_op = ScaleOp.NO_OP + previous_timestamp_exists = \ + self.fedml_model_cache.exists_endpoint_scaling_down_decision_time(endpoint_id) + current_timestamp = self.get_current_timestamp_micro_seconds() + if previous_timestamp_exists: + # Get the timestamp of the previous scaling down timestamp (if any), and + # compare the timestamps difference to measure interval's duration. + previous_timestamp = \ + self.fedml_model_cache.get_endpoint_scaling_down_decision_time(endpoint_id) + diff_secs = (current_timestamp - previous_timestamp) / 1e6 + if diff_secs > autoscaling_policy.scaledown_delay_secs: + # At this point, we will perform the scaling down operation, hence + # we need to delete the previously stored scaling down timestamp (if any). + self.fedml_model_cache.delete_endpoint_scaling_down_decision_time(endpoint_id) + scale_op = ScaleOp.DOWN_IN_OP + else: + # Record the timestamp of the scaling down operation. + self.fedml_model_cache.set_endpoint_scaling_down_decision_time( + endpoint_id, current_timestamp) + + return scale_op + def scale_operation_endpoint(self, autoscaling_policy: AutoscalingPolicy, endpoint_id: str) -> ScaleOp: @@ -270,19 +281,18 @@ def scale_operation_endpoint(self, """ # Fetch most recent metric record from the database. - most_recent_metric = self.fedml_model_cache.get_endpoint_metrics( - endpoint_id=endpoint_id, - k_recent=1) + metrics = self.fedml_model_cache.get_endpoint_metrics( + endpoint_id=endpoint_id) # Default to nothing. scale_op = ScaleOp.NO_OP - if not most_recent_metric: + if not metrics: # If no metric exists then no scaling operation. return scale_op # If we continue here, then it means that there was at least one request. # The `most_recent_metric` is of type list, hence we need to access index 0. - most_recent_metric = most_recent_metric[0] + most_recent_metric = metrics[-1] latest_request_timestamp_micro_secs = most_recent_metric["timestamp"] # The time module does not have a micro-second function built-in, so we need to # divide nanoseconds by 1e3 and convert to micro-seconds. @@ -301,25 +311,28 @@ def scale_operation_endpoint(self, # then we need more resources, hence ScaleOp.UP_OUT_OP. scale_op = ScaleOp.UP_OUT_OP else: - # Else, trigger the autoscaling policy. Fetch all previous - # timeseries values. We do not check if the list is empty, - # since we already have past requests. - metrics = self.fedml_model_cache.get_endpoint_metrics( - endpoint_id=endpoint_id) + # Else, trigger the autoscaling policy with all existing values. metrics_df = pd.DataFrame.from_records(metrics) metrics_df = metrics_df.set_index('timestamp') # timestamp is expected to be in micro-seconds, hence unit='us'. metrics_df.index = pd.to_datetime(metrics_df.index, unit="us") - - # Trigger autoscaler with the metrics we have collected. + # Decide scaling operation given all metrics. scale_op = self.run_autoscaling_policy( autoscaling_policy=autoscaling_policy, metrics=metrics_df) - # Finally, check the scaling bounds of the endpoint + # Check the scaling bounds of the endpoint # before triggering the scaling operation. scale_op = self.validate_scaling_bounds( scale_op=scale_op, autoscaling_policy=autoscaling_policy) + print(scale_op) + + # If the scaling decision is a scale down operation, then perform + # a final check to ensure the scaling down grace period is satisfied. + if scale_op == scale_op.DOWN_IN_OP: + scale_op = self.enforce_scaling_down_delay_interval( + endpoint_id, autoscaling_policy) + return scale_op diff --git a/python/fedml/computing/scheduler/model_scheduler/autoscaler/policies.py b/python/fedml/computing/scheduler/model_scheduler/autoscaler/policies.py new file mode 100644 index 0000000000..546817ec82 --- /dev/null +++ b/python/fedml/computing/scheduler/model_scheduler/autoscaler/policies.py @@ -0,0 +1,98 @@ +from pydantic import BaseModel, field_validator, NonNegativeInt, NonNegativeFloat + + +class AutoscalingPolicy(BaseModel): + """ + Below are some default values for every endpoint. + + The following parameters refer to: + - current_replicas: the number of currently running replicas of the endpoint + - min_replicas: the minimum number of replicas of the endpoint in the instance group + - max_replicas: the maximum number of replicas of the endpoint in the instance group + - release_replica_after_idle_secs: when to release a single idle replica + - scaledown_delay_secs: how many seconds to wait before performing a scale down operation + - scaleup_cost_secs: how many seconds it takes/costs to perform a scale up operation + - previous_triggering_value: the last value that triggered a scaling operation + + The `replica_idle_grace_secs` parameter is used as + the monitoring interval after which a running replica + of an idle endpoint should be released. + """ + current_replicas: NonNegativeInt + min_replicas: NonNegativeInt + max_replicas: NonNegativeInt + previous_triggering_value: float = None + release_replica_after_idle_secs: NonNegativeInt = 300 # default is after 5 minutes + scaledown_delay_secs: NonNegativeInt = 60 # default is 1 minute + scaleup_cost_secs: NonNegativeInt = 300 # default is 5 minutes + + +class EWMPolicy(AutoscalingPolicy): + """ + Configuration parameters for the reactive autoscaling policy. + EWM stands for Exponential Weighted Calculations, since we use + the pandas.DataFrame.ewm() functionality. + + For panda's EWM using alpha = 0.1, we indicate that the most recent + values are weighted more. The reason is that the exponential weighted + mean formula in pandas is computed as: + Yt = X_t + (1-a) * X_{t-1} + (1-a)^2 X_{t-2} / (1 + (1-a) + (1-a)^2) + + The following parameters refer to: + - ewm_mins: the length of the interval we consider for reactive decision + - ewm_alpha: the decay factor for the exponential weighted interval + - ewm_latest: the latest recorded value of the metric's exponential weighted mean + - ub_threshold: the upper bound scaling factor threshold for reactive decision + - lb_threshold: the lower bound scaling factor threshold for reactive decision + + Example: + + Let's say that we consider 15 minutes as the length of our interval and a + decay factor alpha with a value of 0.5: + Original Sequence: [0.1, 0.2, 0.4, 3, 5, 10] + EWM Sequence: [0.1, [0.166, 0.3, 1.74, 3.422, 6.763] + + If we assume that our previous scaling operation was triggered at value Y, + then the conditions we use to decide whether to scale up or down are: + Latency: + ScaleUP: X > ((1 + ub_threshold) * Y) + ScaleDown: X < (lb_threshold * Y) + QPS: + ScaleUP: X < (lb_threshold * Y) + ScaleDown: X < ((1 + ub_threshold) * Y) + + In other words, QPS is the inverse of Latency and vice versa. + """ + metric: str # possible values: ["ewm_latency", "ewm_qps"] + ewm_mins: NonNegativeInt # recommended value: 15 minutes + ewm_alpha: NonNegativeFloat # recommended value: 0.1 + ewm_latest: NonNegativeFloat = None # will be filled by the algorithm + ub_threshold: NonNegativeFloat # recommended value: 0.5 + lb_threshold: NonNegativeFloat # recommended value: 0.5 + + @field_validator("metric") + def validate_option(cls, v): + assert v in ["ewm_latency", "ewm_qps"] + return v + + +class ConcurrentQueryPolicy(AutoscalingPolicy): + """ + This policy captures the number of queries we want to support + per replica over the defined window length in seconds. + """ + queries_per_replica: NonNegativeInt # recommended is at least 1 query + window_size_secs: NonNegativeInt # recommended is at least 60seconds + + +class MeetTrafficDemandPolicy(AutoscalingPolicy): + """ + This policy captures the number of queries we want to support + per replica over the defined window length in seconds. + """ + window_size_secs: NonNegativeInt + + +class PredictivePolicy(AutoscalingPolicy): + # TODO(fedml-dimitris): TO BE COMPLETED! + pass diff --git a/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/autoscaler_test.py b/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/autoscaler_test.py index caa9125d8d..7af1022c7d 100644 --- a/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/autoscaler_test.py +++ b/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/autoscaler_test.py @@ -3,7 +3,8 @@ import time from collections import namedtuple -from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, EWMPolicy, ConcurrentQueryPolicy +from fedml.computing.scheduler.model_scheduler.autoscaler.policies import * +from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, ScaleOp from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache from fedml.core.mlops.mlops_runtime_log import MLOpsRuntimeLog @@ -11,11 +12,24 @@ ENV_REDIS_PORT = 6379 ENV_REDIS_PASSWD = "fedml_default" ENV_ENDPOINT_ID_1 = 12345 -ENV_ENDPOINT_ID_2 = 77777 class AutoscalerTest(unittest.TestCase): + @classmethod + def populate_redis_with_dummy_metrics(cls): + fedml_model_cache = FedMLModelCache.get_instance() + fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD) + fedml_model_cache.set_monitor_metrics( + ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0) + + @classmethod + def clear_redis(cls): + fedml_model_cache = FedMLModelCache.get_instance() + # Clean up redis after test. + fedml_model_cache.delete_endpoint_metrics( + endpoint_ids=[ENV_ENDPOINT_ID_1]) + def test_autoscaler_singleton_pattern(self): autoscaler_1 = Autoscaler.get_instance() autoscaler_2 = Autoscaler.get_instance() @@ -24,72 +38,118 @@ def test_autoscaler_singleton_pattern(self): self.assertTrue(autoscaler_1 is autoscaler_2) def test_scale_operation_single_endpoint_ewm_policy(self): - - # Populate redis with some dummy values for each endpoint before running the test. - fedml_model_cache = FedMLModelCache.get_instance() - fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD) - fedml_model_cache.set_monitor_metrics( - ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0) - fedml_model_cache.set_monitor_metrics( - ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0) - + self.populate_redis_with_dummy_metrics() # Create autoscaler instance and define policy. autoscaler = Autoscaler.get_instance() latency_reactive_policy_default = { + "current_replicas": 1, "min_replicas": 1, "max_replicas": 1, - "current_replicas": 1, "metric": "ewm_latency", "ewm_mins": 15, "ewm_alpha": 0.5, "ub_threshold": 0.5, "lb_threshold": 0.5 } - autoscaling_policy = EWMPolicy(**latency_reactive_policy_default) scale_op_1 = autoscaler.scale_operation_endpoint( autoscaling_policy, endpoint_id=ENV_ENDPOINT_ID_1) - scale_op_2 = autoscaler.scale_operation_endpoint( - autoscaling_policy, - endpoint_id=ENV_ENDPOINT_ID_2) - - # Clean up redis after test. - fedml_model_cache.delete_model_endpoint_metrics( - endpoint_ids=[ENV_ENDPOINT_ID_1, ENV_ENDPOINT_ID_2]) # TODO Change to ScaleUP or ScaleDown not only not None. self.assertIsNotNone(scale_op_1) - self.assertIsNotNone(scale_op_2) + self.clear_redis() def test_scale_operation_single_endpoint_concurrency_query_policy(self): + self.populate_redis_with_dummy_metrics() + # Create autoscaler instance and define policy. + autoscaler = Autoscaler.get_instance() + concurrent_query_policy = { + "current_replicas": 1, + "min_replicas": 1, + "max_replicas": 1, + "queries_per_replica": 1, + "window_size_secs": 60 + } + autoscaling_policy = ConcurrentQueryPolicy(**concurrent_query_policy) + scale_op_1 = autoscaler.scale_operation_endpoint( + autoscaling_policy, + endpoint_id=ENV_ENDPOINT_ID_1) - # Populate redis with some dummy values for each endpoint before running the test. - fedml_model_cache = FedMLModelCache.get_instance() - fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD) - fedml_model_cache.set_monitor_metrics( - ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0) + # TODO Change to ScaleUP or ScaleDown not only not None. + self.assertIsNotNone(scale_op_1) + self.clear_redis() + def test_scale_operation_single_endpoint_meet_traffic_demand_query_policy(self): + self.populate_redis_with_dummy_metrics() # Create autoscaler instance and define policy. autoscaler = Autoscaler.get_instance() concurrent_query_policy = { + "current_replicas": 1, "min_replicas": 1, "max_replicas": 1, - "current_replicas": 1, - "queries_per_replica": 2, "window_size_secs": 60 + "window_size_secs": 60 } - - autoscaling_policy = EWMPolicy(**concurrent_query_policy) + autoscaling_policy = MeetTrafficDemandPolicy(**concurrent_query_policy) scale_op_1 = autoscaler.scale_operation_endpoint( autoscaling_policy, endpoint_id=ENV_ENDPOINT_ID_1) - # Clean up redis after test. - fedml_model_cache.delete_model_endpoint_metrics( - endpoint_ids=[ENV_ENDPOINT_ID_1]) - # TODO Change to ScaleUP or ScaleDown not only not None. self.assertIsNotNone(scale_op_1) + self.clear_redis() + + def test_validate_scaling_bounds(self): + # Create autoscaler instance and define policy. + autoscaler = Autoscaler.get_instance() + autoscaling_policy = { + "current_replicas": 2, + "min_replicas": 1, + "max_replicas": 3, + } + autoscaling_policy = AutoscalingPolicy(**autoscaling_policy) + + # Validate scale up. + scale_up = autoscaler.validate_scaling_bounds(ScaleOp.UP_OUT_OP, autoscaling_policy) + self.assertEquals(scale_up, ScaleOp.UP_OUT_OP) + + # Validate scale down. + scale_down = autoscaler.validate_scaling_bounds(ScaleOp.DOWN_IN_OP, autoscaling_policy) + self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP) + + # Validate max out-of-bounds. + autoscaling_policy.current_replicas = 3 + scale_oob_max = autoscaler.validate_scaling_bounds(ScaleOp.UP_OUT_OP, autoscaling_policy) + self.assertEquals(scale_oob_max, ScaleOp.NO_OP) + + # Validate min out-of-bounds. + autoscaling_policy.current_replicas = 1 + scale_oob_min = autoscaler.validate_scaling_bounds(ScaleOp.DOWN_IN_OP, autoscaling_policy) + self.assertEquals(scale_oob_min, ScaleOp.NO_OP) + + def test_enforce_scaling_down_delay_interval(self): + self.populate_redis_with_dummy_metrics() + # Create autoscaler instance and define policy. + autoscaler = Autoscaler.get_instance() + autoscaling_policy = { + "current_replicas": 1, + "min_replicas": 1, + "max_replicas": 1, + } + autoscaling_policy = AutoscalingPolicy(**autoscaling_policy) + + autoscaling_policy.scaledown_delay_secs = 0.0 + scale_down = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy) + self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP) + + autoscaling_policy.scaledown_delay_secs = 1 + scale_noop = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy) + self.assertEquals(scale_noop, ScaleOp.NO_OP) + + time.sleep(2) + scale_down = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy) + self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP) + self.clear_redis() if __name__ == "__main__": diff --git a/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/scaling_algorithm_sim_test.py b/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/scaling_algorithm_sim_test.py index 455e985219..e417030fa6 100644 --- a/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/scaling_algorithm_sim_test.py +++ b/python/fedml/computing/scheduler/model_scheduler/autoscaler/test/scaling_algorithm_sim_test.py @@ -8,7 +8,7 @@ from collections import namedtuple from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import \ - Autoscaler, EWMPolicy, ConcurrentQueryPolicy + Autoscaler, EWMPolicy, ConcurrentQueryPolicy, MeetTrafficDemandPolicy from fedml.core.mlops.mlops_runtime_log import MLOpsRuntimeLog from fedml.computing.scheduler.model_scheduler.autoscaler.test.traffic_simulation import TrafficSimulation from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache @@ -73,9 +73,9 @@ def plot_qps_vs_latency_vs_scale( parser.add_argument('--endpoint_id', default=12345) parser.add_argument('--metric', default="query_concurrency", - help="Either ewm_latency, ewm_qps, query_concurrency") + help="Either ewm_latency, ewm_qps, query_concurrency, meet_traffic_demand") parser.add_argument('--distribution', - default="random", + default="seasonal", help="Either random, linear, exponential or seasonal.") args = parser.parse_args() @@ -101,7 +101,7 @@ def plot_qps_vs_latency_vs_scale( elif args.distribution == "seasonal": traffic_dist = TrafficSimulation(start_date=start_date).generate_traffic_with_seasonality( num_values=1000, - submit_request_every_x_secs=10, + submit_request_every_x_secs=30, with_warmup=False) else: raise RuntimeError("Not a supported distribution") @@ -109,27 +109,37 @@ def plot_qps_vs_latency_vs_scale( # INFO Please remember to change these two variables below when attempting # to test the simulation of the autoscaling policy simulation. testing_metric = args.metric + policy_config = dict() + policy_config["min_replicas"] = 1 # Always 1. + policy_config["max_replicas"] = 1000 # Unlimited. + policy_config["current_replicas"] = 1 + policy_config["scaledown_delay_secs"] = 0 + if testing_metric == "ewm_latency": - policy_config = \ - {"metric": "ewm_latency", "ewm_mins": 15, "ewm_alpha": 0.5, "ub_threshold": 0.5, "lb_threshold": 0.5} + policy_config.update({ + "metric": "ewm_latency", "ewm_mins": 15, "ewm_alpha": 0.5, "ub_threshold": 0.5, "lb_threshold": 0.5 + }) autoscaling_policy = EWMPolicy(**policy_config) elif testing_metric == "ewm_qps": - policy_config = \ - {"metric": "ewm_qps", "ewm_mins": 15, "ewm_alpha": 0.5, "ub_threshold": 2, "lb_threshold": 0.5} + policy_config.update({ + "metric": "ewm_qps", "ewm_mins": 15, "ewm_alpha": 0.5, "ub_threshold": 2, "lb_threshold": 0.5 + }) autoscaling_policy = EWMPolicy(**policy_config) elif testing_metric == "query_concurrency": - policy_config = \ - {"queries_per_replica": 2, "window_size_secs": 60} + policy_config.update({ + "queries_per_replica": 2, "window_size_secs": 60 + }) autoscaling_policy = ConcurrentQueryPolicy(**policy_config) + elif testing_metric == "meet_traffic_demand": + policy_config.update({ + "window_size_secs": 60 + }) + autoscaling_policy = MeetTrafficDemandPolicy(**policy_config) else: raise RuntimeError("Please define a valid policy metric.") print(policy_config) - autoscaler = Autoscaler.get_instance(args.redis_addr, args.redis_port, args.redis_password) - autoscaling_policy.min_replicas = 1 # Always 1. - autoscaling_policy.max_replicas = 1000 # Unlimited. - autoscaling_policy.current_replicas = 1 scale_operations = [] ewm_values = [] @@ -160,7 +170,7 @@ def plot_qps_vs_latency_vs_scale( autoscaling_policy.current_replicas + scale_op.value if isinstance(autoscaling_policy, EWMPolicy): ewm_values.append(autoscaling_policy.ewm_latest) - triggering_values.append(autoscaling_policy.last_triggering_value) + triggering_values.append(autoscaling_policy.previous_triggering_value) scale_operations.append(scale_op) triggering_values_to_plot = [] @@ -179,5 +189,5 @@ def plot_qps_vs_latency_vs_scale( triggering_points=triggering_values_to_plot) # Clear redis monitor keys. - fedml_model_cache.delete_model_endpoint_metrics( + fedml_model_cache.delete_endpoint_metrics( endpoint_ids=[args.endpoint_id]) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py b/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py index 98fd9042b1..cd72cd51e9 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_cache.py @@ -25,6 +25,9 @@ class FedMLModelCache(Singleton): # For scale-out & scale-in FEDML_MODEL_ENDPOINT_REPLICA_USER_SETTING_TAG = "FEDML_MODEL_ENDPOINT_REPLICA_USER_SETTING_TAG-" + # For keeping track scale down decisions state. + FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG = "FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG-" + # On the worker FEDML_MODEL_REPLICA_GPU_IDS_TAG = "FEDML_MODEL_REPLICA_GPU_IDS_TAG-" @@ -106,7 +109,8 @@ def set_user_setting_replica_num(self, end_point_id, end_point_name: str, model_name: str, model_version: str, replica_num: int, enable_auto_scaling: bool = False, scale_min: int = 0, scale_max: int = 0, state: str = "UNKNOWN", - target_queries_per_replica: int = 60, aggregation_window_size_seconds: int = 60 + target_queries_per_replica: int = 60, aggregation_window_size_seconds: int = 60, + scale_down_delay_seconds: int = 120 ) -> bool: """ Key: FEDML_MODEL_ENDPOINT_REPLICA_USER_SETTING_TAG-- @@ -131,7 +135,8 @@ def set_user_setting_replica_num(self, end_point_id, "model_version": model_version, "replica_num": replica_num, "enable_auto_scaling": enable_auto_scaling, "scale_min": scale_min, "scale_max": scale_max, "state": state, "target_queries_per_replica": target_queries_per_replica, - "aggregation_window_size_seconds": aggregation_window_size_seconds + "aggregation_window_size_seconds": aggregation_window_size_seconds, + "scale_down_delay_seconds": scale_down_delay_seconds } try: self.redis_connection.set(self.get_user_setting_replica_num_key(end_point_id), json.dumps(replica_num_dict)) @@ -813,7 +818,7 @@ def get_metrics_item_info(self, metrics_item): device_id = metrics_item_json["device_id"] return total_latency, avg_latency, current_latency, total_request_num, current_qps, avg_qps, timestamp, device_id - def get_monitor_metrics_key(self,end_point_id, end_point_name, model_name, model_version): + def get_monitor_metrics_key(self, end_point_id, end_point_name, model_name, model_version): return "{}{}-{}-{}-{}".format(FedMLModelCache.FEDML_MODEL_DEPLOYMENT_MONITOR_TAG, end_point_id, end_point_name, model_name, model_version) @@ -905,7 +910,8 @@ def get_endpoint_settings(self, endpoint_id) -> Dict: return endpoint_settings - def delete_model_endpoint_metrics(self, endpoint_ids: list): + def delete_endpoint_metrics(self, endpoint_ids: list) -> bool: + status = True for endpoint_id in endpoint_ids: try: key_pattern = "{}*{}*".format( @@ -917,3 +923,45 @@ def delete_model_endpoint_metrics(self, endpoint_ids: list): self.redis_connection.delete(k) except Exception as e: logging.error(e) + # False if an error occurred. + status = False + return status + + def set_endpoint_scaling_down_decision_time(self, end_point_id, timestamp) -> bool: + status = True + try: + self.redis_connection.hset( + self.FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG, + mapping={end_point_id: timestamp}) + except Exception as e: + logging.error(e) + status = False + return status + + def get_endpoint_scaling_down_decision_time(self, end_point_id) -> int: + try: + scaling_down_decision_time = \ + self.redis_connection.hget( + self.FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG, + end_point_id) + if len(scaling_down_decision_time) > 0: + scaling_down_decision_time = int(scaling_down_decision_time) + else: + scaling_down_decision_time = 0 + except Exception as e: + scaling_down_decision_time = 0 + logging.error(e) + + return scaling_down_decision_time + + def exists_endpoint_scaling_down_decision_time(self, end_point_id) -> bool: + # The hash exists returns an integer 0 (not found), 1 (found), hence we need + # to cast it to a boolean value. + return bool(self.redis_connection.hexists( + self.FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG, + end_point_id)) + + def delete_endpoint_scaling_down_decision_time(self, end_point_id) -> bool: + return bool(self.redis_connection.hdel( + self.FEDML_MODEL_ENDPOINT_SCALING_DOWN_DECISION_TIME_TAG, + end_point_id)) diff --git a/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py b/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py index 39dfd3cc4e..472cab84aa 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_model_monitor.py @@ -134,8 +134,8 @@ def send_monitoring_metrics(self, index): "model_id": self.current_model_id, "model_url": self.current_infer_url, "end_point_id": self.current_end_point_id, - "latency": float(avg_latency * 1000), # milliseconds - "qps": float(avg_qps * 1000), # milliseconds + "latency": round(float(avg_latency) * 1000, 2), # milliseconds + "qps": round(float(avg_qps), 2), "current_latency": float(current_latency), "current_qps": float(current_qps), "total_request_num": int(total_request_num), diff --git a/python/fedml/computing/scheduler/model_scheduler/device_server_runner.py b/python/fedml/computing/scheduler/model_scheduler/device_server_runner.py index 5da76ede03..fc8e927423 100755 --- a/python/fedml/computing/scheduler/model_scheduler/device_server_runner.py +++ b/python/fedml/computing/scheduler/model_scheduler/device_server_runner.py @@ -939,6 +939,7 @@ def callback_start_deployment(self, topic, payload): target_queries_per_replica = request_json.get("target_queries_per_replica", 10) aggregation_window_size_seconds = request_json.get("aggregation_window_size_seconds", 60) + scale_down_delay_seconds = request_json.get("scale_down_delay_seconds", 120) inference_end_point_id = run_id @@ -953,7 +954,9 @@ def callback_start_deployment(self, topic, payload): replica_num=desired_replica_num, enable_auto_scaling=enable_auto_scaling, scale_min=scale_min, scale_max=scale_max, state="DEPLOYING", aggregation_window_size_seconds=aggregation_window_size_seconds, - target_queries_per_replica=target_queries_per_replica) + target_queries_per_replica=target_queries_per_replica, + scale_down_delay_seconds=int(scale_down_delay_seconds) + ) # Start log processor for current run self.args.run_id = run_id