Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take into account queue length in autoscaling #5684

Merged
merged 5 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions doc/source/autoscaling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ as described in `the boto docs <http://boto3.readthedocs.io/en/latest/guide/conf
Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example-full.yaml <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/aws/example-full.yaml>`__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers <https://aws.amazon.com/ec2/spot/>`__.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="localhost:6379")``.
SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="auto")``.

.. code-block:: bash

Expand All @@ -37,7 +37,7 @@ First, install the Google API client (``pip install google-api-python-client``),
Then you're ready to go. The provided `ray/python/ray/autoscaler/gcp/example-full.yaml <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/gcp/example-full.yaml>`__ cluster config file will create a small cluster with a n1-standard-2 head node (on-demand) configured to autoscale up to two n1-standard-2 `preemptible workers <https://cloud.google.com/preemptible-vms/>`__. Note that you'll need to fill in your project id in those templates.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``.
SSH into the head node and then run Ray programs with ``ray.init(address="auto")``.

.. code-block:: bash

Expand All @@ -59,7 +59,7 @@ This is used when you have a list of machine IP addresses to connect in a Ray cl
Be sure to specify the proper ``head_ip``, list of ``worker_ips``, and the ``ssh_user`` field.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``.
SSH into the head node and then run Ray programs with ``ray.init(address="auto")``.

.. code-block:: bash

Expand All @@ -77,7 +77,7 @@ SSH into the head node and then run Ray programs with ``ray.init(address="localh
Running commands on new and existing clusters
---------------------------------------------

You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="localhost:6379")``.
You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="auto")``.

.. code-block:: bash

Expand Down Expand Up @@ -261,7 +261,7 @@ with GPU worker nodes instead.

.. code-block:: yaml

min_workers: 1 # must have at least 1 GPU worker (issue #2106)
min_workers: 0 # NOTE: older Ray versions may need 1+ GPU workers (#2106)
max_workers: 10
head_node:
InstanceType: m4.large
Expand Down
9 changes: 8 additions & 1 deletion python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ def __init__(self):
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()

def update(self, ip, static_resources, dynamic_resources):
def update(self, ip, static_resources, dynamic_resources, resource_load):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources

# We are not guaranteed to have a corresponding dynamic resource for
Expand Down Expand Up @@ -204,6 +206,7 @@ def prune(mapping):
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
prune(self.dynamic_resources_by_ip)
prune(self.resource_load_by_ip)
prune(self.last_heartbeat_time_by_ip)

def approx_workers_used(self):
Expand All @@ -218,7 +221,11 @@ def get_resource_usage(self):
resources_total = {}
for ip, max_resources in self.static_resources_by_ip.items():
avail_resources = self.dynamic_resources_by_ip[ip]
resource_load = self.resource_load_by_ip[ip]
max_frac = 0.0
for resource_id, amount in resource_load.items():
if amount > 0:
max_frac = 1.0 # the resource is saturated
for resource_id, amount in max_resources.items():
used = amount - avail_resources[resource_id]
if resource_id not in resources_used:
Expand Down
3 changes: 2 additions & 1 deletion python/ray/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def update_log_filenames(self):
log_file_paths = glob.glob("{}/worker*[.out|.err]".format(
self.logs_dir))
# segfaults and other serious errors are logged here
raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir))
raylet_err_paths = (glob.glob("{}/raylet*.err".format(self.logs_dir)) +
glob.glob("{}/monitor*.err".format(self.logs_dir)))
for file_path in log_file_paths + raylet_err_paths:
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
Expand Down
6 changes: 5 additions & 1 deletion python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)
for heartbeat_message in message.batch:
resource_load = dict(
zip(heartbeat_message.resource_load_label,
heartbeat_message.resource_load_capacity))
total_resources = dict(
zip(heartbeat_message.resources_total_label,
heartbeat_message.resources_total_capacity))
Expand All @@ -122,7 +125,7 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
ip = self.raylet_id_to_ip_map.get(client_id)
if ip:
self.load_metrics.update(ip, total_resources,
available_resources)
available_resources, resource_load)
else:
logger.warning(
"Monitor: "
Expand Down Expand Up @@ -357,6 +360,7 @@ def run(self):
try:
self._run()
except Exception:
logger.exception("Error in monitor loop")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do this, the error message is lost forever trying to terminate nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean nodes probably weren't cleaned up? If so, would be good to print that in the error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's done below actually

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.exception("Error in monitor loop")
logger.exception("Error in monitor loop.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe in periods in log messages

if self.autoscaler:
self.autoscaler.kill_workers()
raise
Expand Down
55 changes: 33 additions & 22 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,45 +142,56 @@ def terminate_node(self, node_id):
class LoadMetricsTest(unittest.TestCase):
def testUpdate(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some test cases for the new metric

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there are a couple entries below in testLoadMessages

assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
assert lm.approx_workers_used() == 2.0

def testLoadMessages(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 1.5
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1})
assert lm.approx_workers_used() == 2.0

def testPruneByNodeIp(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0})
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {})
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
assert lm.approx_workers_used() == 1.0

def testBottleneckResource(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
assert lm.approx_workers_used() == 1.88

def testHeartbeat(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
lm.mark_active("2.2.2.2")
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
assert "3.3.3.3" not in lm.last_heartbeat_time_by_ip

def testDebugString(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
lm.update("3.3.3.3", {
"memory": 20,
"object_store_memory": 40
}, {
"memory": 0,
"object_store_memory": 20
})
}, {})
debug = lm.info_string()
assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, "
"1.05 GiB/1.05 GiB memory, "
Expand Down Expand Up @@ -418,8 +429,8 @@ def testAggressiveAutoscaling(self):
tag_filters={TAG_RAY_NODE_TYPE: "worker"}, )
addrs += head_ip
for addr in addrs:
lm.update(addr, {"CPU": 2}, {"CPU": 0})
lm.update(addr, {"CPU": 2}, {"CPU": 2})
lm.update(addr, {"CPU": 2}, {"CPU": 0}, {})
lm.update(addr, {"CPU": 2}, {"CPU": 2}, {})
assert autoscaler.bringup
autoscaler.update()

Expand All @@ -428,7 +439,7 @@ def testAggressiveAutoscaling(self):
self.waitForNodes(1)

# All of the nodes are down. Simulate some load on the head node
lm.update(head_ip, {"CPU": 2}, {"CPU": 0})
lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {})

autoscaler.update()
self.waitForNodes(6) # expected due to batch sizes and concurrency
Expand Down Expand Up @@ -702,17 +713,17 @@ def testScaleUpBasedOnLoad(self):

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1
autoscaler.update()
self.waitForNodes(3)
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {})
autoscaler.update()
self.waitForNodes(5)

# Holds steady when load is removed
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.non_terminated_nodes({})) == 5
Expand Down Expand Up @@ -746,20 +757,20 @@ def testDontScaleBelowTarget(self):

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
# 1.0 nodes used => target nodes = 2 => target workers = 1
autoscaler.update()
self.waitForNodes(1)

# Make new node idle, and never used.
# Should hold steady as target is still 2.
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0})
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
lm.last_used_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 1

# Reduce load on head => target nodes = 1 => target workers = 0
lm.update(local_ip, {"CPU": 2}, {"CPU": 1})
lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {})
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 0

Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id,
}

ResourceSet SchedulingQueue::GetResourceLoad() const {
// TODO(atumanov): consider other types of tasks as part of load.
return ready_queue_->GetCurrentResourceLoad();
auto load = ready_queue_->GetCurrentResourceLoad();
// Also take into account infeasible tasks so they show up for autoscaling
ericl marked this conversation as resolved.
Show resolved Hide resolved
load.AddResources(
task_queues_[static_cast<int>(TaskState::INFEASIBLE)]->GetCurrentResourceLoad());
return load;
}

const std::unordered_set<TaskID> &SchedulingQueue::GetBlockedTaskIds() const {
Expand Down