Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Deploy] Refine Autoscaling Algorithm #2041

Merged
merged 7 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions python/fedml/computing/scheduler/comm_utils/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from .job_utils import JobRunnerUtils
from ..model_scheduler.device_http_proxy_inference_protocol import FedMLHttpProxyInference
from ..model_scheduler.device_model_cache import FedMLModelCache
from ..model_scheduler.autoscaler.autoscaler import Autoscaler, EWMPolicy, ConcurrentQueryPolicy
from ..model_scheduler.autoscaler.autoscaler import Autoscaler
from ..model_scheduler.autoscaler.policies import ConcurrentQueryPolicy
from ..model_scheduler.device_model_db import FedMLModelDatabase
from ..model_scheduler.device_mqtt_inference_protocol import FedMLMqttInference
from ..slave import client_constants
Expand Down Expand Up @@ -109,8 +110,14 @@ def autoscaler_reconcile_after_interval(self):
# Set the policy, here we use latency, but other metrics are possible as well, such as qps.
# For more advanced use cases look for the testing scripts under the autoscaler/test directory.
autoscaling_policy_config = \
{"queries_per_replica": endpoint_settings["target_queries_per_replica"],
"window_size_secs": endpoint_settings["aggregation_window_size_seconds"]}
{
"current_replicas": int(endpoint_settings["replica_num"]),
"min_replicas": int(endpoint_settings["scale_min"]),
"max_replicas": int(endpoint_settings["scale_max"]),
"queries_per_replica": int(endpoint_settings["target_queries_per_replica"]),
"window_size_secs": int(endpoint_settings["aggregation_window_size_seconds"]),
"scaledown_delay_secs": int(endpoint_settings["scale_down_delay_seconds"]),
}
autoscaling_policy = ConcurrentQueryPolicy(**autoscaling_policy_config)

e_id, e_name, model_name = endpoint_settings["endpoint_id"], endpoint_settings["endpoint_name"], \
Expand Down
271 changes: 142 additions & 129 deletions python/fedml/computing/scheduler/model_scheduler/autoscaler/autoscaler.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from pydantic import BaseModel, field_validator, NonNegativeInt, NonNegativeFloat


class AutoscalingPolicy(BaseModel):
"""
Below are some default values for every endpoint.

The following parameters refer to:
- current_replicas: the number of currently running replicas of the endpoint
- min_replicas: the minimum number of replicas of the endpoint in the instance group
- max_replicas: the maximum number of replicas of the endpoint in the instance group
- release_replica_after_idle_secs: when to release a single idle replica
- scaledown_delay_secs: how many seconds to wait before performing a scale down operation
- scaleup_cost_secs: how many seconds it takes/costs to perform a scale up operation
- previous_triggering_value: the last value that triggered a scaling operation

The `replica_idle_grace_secs` parameter is used as
the monitoring interval after which a running replica
of an idle endpoint should be released.
"""
current_replicas: NonNegativeInt
min_replicas: NonNegativeInt
max_replicas: NonNegativeInt
previous_triggering_value: float = None
release_replica_after_idle_secs: NonNegativeInt = 300 # default is after 5 minutes
scaledown_delay_secs: NonNegativeInt = 60 # default is 1 minute
scaleup_cost_secs: NonNegativeInt = 300 # default is 5 minutes


class EWMPolicy(AutoscalingPolicy):
"""
Configuration parameters for the reactive autoscaling policy.
EWM stands for Exponential Weighted Calculations, since we use
the pandas.DataFrame.ewm() functionality.

For panda's EWM using alpha = 0.1, we indicate that the most recent
values are weighted more. The reason is that the exponential weighted
mean formula in pandas is computed as:
Yt = X_t + (1-a) * X_{t-1} + (1-a)^2 X_{t-2} / (1 + (1-a) + (1-a)^2)

The following parameters refer to:
- ewm_mins: the length of the interval we consider for reactive decision
- ewm_alpha: the decay factor for the exponential weighted interval
- ewm_latest: the latest recorded value of the metric's exponential weighted mean
- ub_threshold: the upper bound scaling factor threshold for reactive decision
- lb_threshold: the lower bound scaling factor threshold for reactive decision

Example:

Let's say that we consider 15 minutes as the length of our interval and a
decay factor alpha with a value of 0.5:
Original Sequence: [0.1, 0.2, 0.4, 3, 5, 10]
EWM Sequence: [0.1, [0.166, 0.3, 1.74, 3.422, 6.763]

If we assume that our previous scaling operation was triggered at value Y,
then the conditions we use to decide whether to scale up or down are:
Latency:
ScaleUP: X > ((1 + ub_threshold) * Y)
ScaleDown: X < (lb_threshold * Y)
QPS:
ScaleUP: X < (lb_threshold * Y)
ScaleDown: X < ((1 + ub_threshold) * Y)

In other words, QPS is the inverse of Latency and vice versa.
"""
metric: str # possible values: ["ewm_latency", "ewm_qps"]
ewm_mins: NonNegativeInt # recommended value: 15 minutes
ewm_alpha: NonNegativeFloat # recommended value: 0.1
ewm_latest: NonNegativeFloat = None # will be filled by the algorithm
ub_threshold: NonNegativeFloat # recommended value: 0.5
lb_threshold: NonNegativeFloat # recommended value: 0.5

@field_validator("metric")
def validate_option(cls, v):
assert v in ["ewm_latency", "ewm_qps"]
return v


class ConcurrentQueryPolicy(AutoscalingPolicy):
"""
This policy captures the number of queries we want to support
per replica over the defined window length in seconds.
"""
queries_per_replica: NonNegativeInt # recommended is at least 1 query
window_size_secs: NonNegativeInt # recommended is at least 60seconds


class MeetTrafficDemandPolicy(AutoscalingPolicy):
"""
This policy captures the number of queries we want to support
per replica over the defined window length in seconds.
"""
window_size_secs: NonNegativeInt


class PredictivePolicy(AutoscalingPolicy):
# TODO(fedml-dimitris): TO BE COMPLETED!
pass
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,33 @@
import time

from collections import namedtuple
from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, EWMPolicy, ConcurrentQueryPolicy
from fedml.computing.scheduler.model_scheduler.autoscaler.policies import *
from fedml.computing.scheduler.model_scheduler.autoscaler.autoscaler import Autoscaler, ScaleOp
from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache
from fedml.core.mlops.mlops_runtime_log import MLOpsRuntimeLog

ENV_REDIS_ADDR = "local"
ENV_REDIS_PORT = 6379
ENV_REDIS_PASSWD = "fedml_default"
ENV_ENDPOINT_ID_1 = 12345
ENV_ENDPOINT_ID_2 = 77777


class AutoscalerTest(unittest.TestCase):

@classmethod
def populate_redis_with_dummy_metrics(cls):
fedml_model_cache = FedMLModelCache.get_instance()
fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)

@classmethod
def clear_redis(cls):
fedml_model_cache = FedMLModelCache.get_instance()
# Clean up redis after test.
fedml_model_cache.delete_endpoint_metrics(
endpoint_ids=[ENV_ENDPOINT_ID_1])

def test_autoscaler_singleton_pattern(self):
autoscaler_1 = Autoscaler.get_instance()
autoscaler_2 = Autoscaler.get_instance()
Expand All @@ -24,72 +38,118 @@ def test_autoscaler_singleton_pattern(self):
self.assertTrue(autoscaler_1 is autoscaler_2)

def test_scale_operation_single_endpoint_ewm_policy(self):

# Populate redis with some dummy values for each endpoint before running the test.
fedml_model_cache = FedMLModelCache.get_instance()
fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)

self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
latency_reactive_policy_default = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
"current_replicas": 1,
"metric": "ewm_latency",
"ewm_mins": 15,
"ewm_alpha": 0.5,
"ub_threshold": 0.5,
"lb_threshold": 0.5
}

autoscaling_policy = EWMPolicy(**latency_reactive_policy_default)
scale_op_1 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_1)
scale_op_2 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_2)

# Clean up redis after test.
fedml_model_cache.delete_model_endpoint_metrics(
endpoint_ids=[ENV_ENDPOINT_ID_1, ENV_ENDPOINT_ID_2])

# TODO Change to ScaleUP or ScaleDown not only not None.
self.assertIsNotNone(scale_op_1)
self.assertIsNotNone(scale_op_2)
self.clear_redis()

def test_scale_operation_single_endpoint_concurrency_query_policy(self):
self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
concurrent_query_policy = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
"queries_per_replica": 1,
"window_size_secs": 60
}
autoscaling_policy = ConcurrentQueryPolicy(**concurrent_query_policy)
scale_op_1 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_1)

# Populate redis with some dummy values for each endpoint before running the test.
fedml_model_cache = FedMLModelCache.get_instance()
fedml_model_cache.set_redis_params(ENV_REDIS_ADDR, ENV_REDIS_PORT, ENV_REDIS_PASSWD)
fedml_model_cache.set_monitor_metrics(
ENV_ENDPOINT_ID_1, "", "", "", 5, 5, 5, 10, 100, 100, int(time.time_ns() / 1000), 0)
# TODO Change to ScaleUP or ScaleDown not only not None.
self.assertIsNotNone(scale_op_1)
self.clear_redis()

def test_scale_operation_single_endpoint_meet_traffic_demand_query_policy(self):
self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
concurrent_query_policy = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
"current_replicas": 1,
"queries_per_replica": 2, "window_size_secs": 60
"window_size_secs": 60
}

autoscaling_policy = EWMPolicy(**concurrent_query_policy)
autoscaling_policy = MeetTrafficDemandPolicy(**concurrent_query_policy)
scale_op_1 = autoscaler.scale_operation_endpoint(
autoscaling_policy,
endpoint_id=ENV_ENDPOINT_ID_1)

# Clean up redis after test.
fedml_model_cache.delete_model_endpoint_metrics(
endpoint_ids=[ENV_ENDPOINT_ID_1])

# TODO Change to ScaleUP or ScaleDown not only not None.
self.assertIsNotNone(scale_op_1)
self.clear_redis()

def test_validate_scaling_bounds(self):
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
autoscaling_policy = {
"current_replicas": 2,
"min_replicas": 1,
"max_replicas": 3,
}
autoscaling_policy = AutoscalingPolicy(**autoscaling_policy)

# Validate scale up.
scale_up = autoscaler.validate_scaling_bounds(ScaleOp.UP_OUT_OP, autoscaling_policy)
self.assertEquals(scale_up, ScaleOp.UP_OUT_OP)

# Validate scale down.
scale_down = autoscaler.validate_scaling_bounds(ScaleOp.DOWN_IN_OP, autoscaling_policy)
self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP)

# Validate max out-of-bounds.
autoscaling_policy.current_replicas = 3
scale_oob_max = autoscaler.validate_scaling_bounds(ScaleOp.UP_OUT_OP, autoscaling_policy)
self.assertEquals(scale_oob_max, ScaleOp.NO_OP)

# Validate min out-of-bounds.
autoscaling_policy.current_replicas = 1
scale_oob_min = autoscaler.validate_scaling_bounds(ScaleOp.DOWN_IN_OP, autoscaling_policy)
self.assertEquals(scale_oob_min, ScaleOp.NO_OP)

def test_enforce_scaling_down_delay_interval(self):
self.populate_redis_with_dummy_metrics()
# Create autoscaler instance and define policy.
autoscaler = Autoscaler.get_instance()
autoscaling_policy = {
"current_replicas": 1,
"min_replicas": 1,
"max_replicas": 1,
}
autoscaling_policy = AutoscalingPolicy(**autoscaling_policy)

autoscaling_policy.scaledown_delay_secs = 0.0
scale_down = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy)
self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP)

autoscaling_policy.scaledown_delay_secs = 1
scale_noop = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy)
self.assertEquals(scale_noop, ScaleOp.NO_OP)

time.sleep(2)
scale_down = autoscaler.enforce_scaling_down_delay_interval(ENV_ENDPOINT_ID_1, autoscaling_policy)
self.assertEquals(scale_down, ScaleOp.DOWN_IN_OP)
self.clear_redis()


if __name__ == "__main__":
Expand Down
Loading
Loading