Skip to content

Commit

Permalink
Improve unschedulable task warning messages by integrating with the a…
Browse files Browse the repository at this point in the history
…utoscaler (#18724)
  • Loading branch information
ericl authored Sep 24, 2021
1 parent fbf5f5d commit 11a2dfc
Show file tree
Hide file tree
Showing 26 changed files with 664 additions and 276 deletions.
1 change: 0 additions & 1 deletion dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,6 @@ def get_cluster_status():
print(response.json())
assert response.json()["result"]
assert "autoscalingStatus" in response.json()["data"]
assert response.json()["data"]["autoscalingStatus"] is None
assert "autoscalingError" in response.json()["data"]
assert response.json()["data"]["autoscalingError"] is None
assert "clusterStatus" in response.json()["data"]
Expand Down
135 changes: 97 additions & 38 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict, namedtuple, Counter
from typing import Callable, Any, Optional, Dict, List, Set, FrozenSet, Tuple
from typing import Any, Optional, Dict, List, Set, FrozenSet, Tuple, Union, \
Callable
import copy
import logging
import math
Expand Down Expand Up @@ -83,7 +84,8 @@ class StandardAutoscaler:

def __init__(
self,
config_path: str,
# TODO(ekl): require config reader to be a callable always.
config_reader: Union[str, Callable[[], dict]],
load_metrics: LoadMetrics,
max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH,
max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
Expand All @@ -96,22 +98,33 @@ def __init__(
"""Create a StandardAutoscaler.
Args:
config_path: Path to a Ray Autoscaler YAML.
load_metrics: Provides metrics for the Ray cluster.
max_launch_batch: Max number of nodes to launch in one request.
max_concurrent_launches: Max number of nodes that can be concurrently
launched. This value and `max_launch_batch` determine the number
of batches that are used to launch nodes.
max_failures: Number of failures that the autoscaler will tolerate
before exiting.
process_runner: Subprocess-like interface used by the CommandRunner.
update_interval_s: Seconds between running the autoscaling loop.
prefix_cluster_info: Whether to add the cluster name to info strings.
event_summarizer: Utility to consolidate duplicated messages.
prom_metrics: Prometheus metrics for autoscaler-related operations.
config_reader: Path to a Ray Autoscaler YAML, or a function to read
and return the latest config.
load_metrics: Provides metrics for the Ray cluster.
max_launch_batch: Max number of nodes to launch in one request.
max_concurrent_launches: Max number of nodes that can be
concurrently launched. This value and `max_launch_batch`
determine the number of batches that are used to launch nodes.
max_failures: Number of failures that the autoscaler will tolerate
before exiting.
process_runner: Subproc-like interface used by the CommandRunner.
update_interval_s: Seconds between running the autoscaling loop.
prefix_cluster_info: Whether to add the cluster name to info strs.
event_summarizer: Utility to consolidate duplicated messages.
prom_metrics: Prometheus metrics for autoscaler-related operations.
"""

self.config_path = config_path
if isinstance(config_reader, str):
# Auto wrap with file reader.
def read_fn():
with open(config_reader) as f:
new_config = yaml.safe_load(f.read())
return new_config

self.config_reader = read_fn
else:
self.config_reader = config_reader

# Prefix each line of info string with cluster name if True
self.prefix_cluster_info = prefix_cluster_info
# Keep this before self.reset (self.provider needs to be created
Expand Down Expand Up @@ -218,17 +231,32 @@ def _update(self):
self.provider.internal_ip(node_id) for node_id in self.all_workers
])

self.terminate_nodes_to_enforce_config_constraints(now)

self.launch_required_nodes()

if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()
if not self.provider.is_readonly():
self.terminate_nodes_to_enforce_config_constraints(now)

# Dict[NodeType, int], List[ResourceDict]
to_launch, unfulfilled = (
self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.load_metrics.
get_resource_requests()))
self._report_pending_infeasible(unfulfilled)

if not self.provider.is_readonly():
self.launch_required_nodes(to_launch)

if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()

logger.info(self.info_string())
legacy_log_info_string(self, self.workers)
Expand Down Expand Up @@ -354,15 +382,7 @@ def terminate_scheduled_nodes(self):
self.nodes_to_terminate = []
self.update_worker_list()

def launch_required_nodes(self):
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.load_metrics.get_resource_requests())
def launch_required_nodes(self, to_launch: Dict[NodeType, int]) -> None:
if to_launch:
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
Expand Down Expand Up @@ -452,6 +472,46 @@ def set_prometheus_updater_data(self):
num_recovering += 1
self.prom_metrics.recovering_nodes.set(num_recovering)

def _report_pending_infeasible(self, unfulfilled: List[ResourceDict]):
"""Emit event messages for infeasible or unschedulable tasks.
This adds messages to the event summarizer for warning on infeasible
or "cluster full" resource requests.
Args:
unfulfilled: List of resource demands that would be unfulfilled
even after full scale-up.
"""
pending = []
infeasible = []
for bundle in unfulfilled:
placement_group = any("_group_" in k for k in bundle)
if placement_group:
continue
if self.resource_demand_scheduler.is_feasible(bundle):
pending.append(bundle)
else:
infeasible.append(bundle)
if pending:
if self.load_metrics.cluster_full_of_actors_detected:
for request in pending:
self.event_summarizer.add_once_per_interval(
"Warning: The following resource request cannot be "
"scheduled right now: {}. This is likely due to all "
"cluster resources being claimed by actors. Consider "
"creating fewer actors or adding more nodes "
"to this Ray cluster.".format(request),
key="pending_{}".format(sorted(request.items())),
interval_s=30)
if infeasible:
for request in infeasible:
self.event_summarizer.add_once_per_interval(
"Error: No available node types can fulfill resource "
"request {}. Add suitable node types to this cluster to "
"resolve this issue.".format(request),
key="infeasible_{}".format(sorted(request.items())),
interval_s=30)

def _sort_based_on_last_used(self, nodes: List[NodeID],
last_used: Dict[str, float]) -> List[NodeID]:
"""Sort the nodes based on the last time they were used.
Expand Down Expand Up @@ -615,8 +675,7 @@ def reset(self, errors_fatal=False):
sync_continuously = self.config.get(
"file_mounts_sync_continuously", False)
try:
with open(self.config_path) as f:
new_config = yaml.safe_load(f.read())
new_config = self.config_reader()
if new_config != getattr(self, "config", None):
try:
validate_config(new_config)
Expand Down
12 changes: 3 additions & 9 deletions python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@
import ray.autoscaler._private.subprocess_output_util as cmd_output_util
from ray.autoscaler._private.load_metrics import LoadMetricsSummary
from ray.autoscaler._private.autoscaler import AutoscalerSummary
from ray.autoscaler._private.util import format_info_string, \
format_info_string_no_node_types
from ray.autoscaler._private.util import format_info_string

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,13 +108,8 @@ def debug_status(status, error) -> str:
as_dict = json.loads(status)
time = datetime.datetime.fromtimestamp(as_dict["time"])
lm_summary = LoadMetricsSummary(**as_dict["load_metrics_report"])
if "autoscaler_report" in as_dict:
autoscaler_summary = AutoscalerSummary(
**as_dict["autoscaler_report"])
status = format_info_string(
lm_summary, autoscaler_summary, time=time)
else:
status = format_info_string_no_node_types(lm_summary, time=time)
autoscaler_summary = AutoscalerSummary(**as_dict["autoscaler_report"])
status = format_info_string(lm_summary, autoscaler_summary, time=time)
if error:
status += "\n"
status += error.decode("utf-8")
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def env_integer(key, default):


# Whether event logging to driver is enabled. Set to 0 to disable.
AUTOSCALER_EVENTS = env_integer("AUTOSCALER_EVENTS", 1)
AUTOSCALER_EVENTS = env_integer("RAY_SCHEDULER_EVENTS", 1)

# Whether to avoid launching GPU nodes for CPU only tasks.
AUTOSCALER_CONSERVE_GPU_NODES = env_integer("AUTOSCALER_CONSERVE_GPU_NODES", 1)
Expand Down
25 changes: 25 additions & 0 deletions python/ray/autoscaler/_private/event_summarizer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from typing import Any, Callable, Dict, List
import time


class EventSummarizer:
"""Utility that aggregates related log messages to reduce log spam."""

def __init__(self):
self.events_by_key: Dict[str, int] = {}
# Messages to send in next summary batch.
self.messages_to_send: List[str] = []
# Tracks TTL of messages. A message will not be re-sent once it is
# added here, until its TTL expires.
self.throttled_messages: Dict[str, float] = {}

def add(self, template: str, *, quantity: Any,
aggregate: Callable[[Any, Any], Any]) -> None:
Expand All @@ -27,13 +33,32 @@ def add(self, template: str, *, quantity: Any,
else:
self.events_by_key[template] = quantity

def add_once_per_interval(self, message: str, key: str, interval_s: int):
"""Add a log message, which is throttled once per interval by a key.
Args:
message (str): The message to log.
key (str): The key to use to deduplicate the message.
interval_s (int): Throttling interval in seconds.
"""
if key not in self.throttled_messages:
self.throttled_messages[key] = time.time() + interval_s
self.messages_to_send.append(message)

def summary(self) -> List[str]:
"""Generate the aggregated log summary of all added events."""
out = []
for template, quantity in self.events_by_key.items():
out.append(template.format(quantity))
out.extend(self.messages_to_send)
return out

def clear(self) -> None:
"""Clear the events added."""
self.events_by_key.clear()
self.messages_to_send.clear()
# Expire any messages that have reached their TTL. This allows them
# to be sent again.
for k, t in list(self.throttled_messages.items()):
if time.time() > t:
del self.throttled_messages[k]
5 changes: 4 additions & 1 deletion python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, local_ip=None):
self.infeasible_bundles = []
self.pending_placement_groups = []
self.resource_requests = []
self.cluster_full_of_actors_detected = False

def update(self,
ip: str,
Expand All @@ -91,9 +92,11 @@ def update(self,
resource_load: Dict[str, Dict],
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None):
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_of_actors_detected: bool = False):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources
self.cluster_full_of_actors_detected = cluster_full_of_actors_detected

if not waiting_bundles:
waiting_bundles = []
Expand Down
Loading

0 comments on commit 11a2dfc

Please sign in to comment.