Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve unschedulable task warning messages by integrating with the autoscaler #18724

Merged
merged 50 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f8cd230
update
ericl Sep 15, 2021
9e72942
wip
ericl Sep 16, 2021
b4c7cd6
wip
ericl Sep 16, 2021
c88624c
wip
ericl Sep 16, 2021
f565373
wip
ericl Sep 17, 2021
dd2dc6b
wip
ericl Sep 17, 2021
cb6a04f
wip
ericl Sep 17, 2021
be74542
add integration
ericl Sep 17, 2021
e4b1f25
update
ericl Sep 17, 2021
a9c8e52
wip
ericl Sep 17, 2021
b10a7b6
update
ericl Sep 17, 2021
99d1899
wip
ericl Sep 17, 2021
d9bbe51
wip
ericl Sep 17, 2021
ae0f3c8
update
ericl Sep 17, 2021
06f8abd
wip
ericl Sep 17, 2021
461bd65
update
ericl Sep 17, 2021
11e0fe7
update
ericl Sep 17, 2021
fa684d1
update
ericl Sep 17, 2021
43fa443
Merge remote-tracking branch 'upstream/master' into local-autoscaler
ericl Sep 22, 2021
6c331b1
resource deadlock name change
ericl Sep 22, 2021
c5d1da5
merge
ericl Sep 22, 2021
62d4a47
wip
ericl Sep 22, 2021
98c7379
comment
ericl Sep 22, 2021
0fc8fe5
update
ericl Sep 22, 2021
475c951
forward compat
ericl Sep 22, 2021
17baa09
wip
ericl Sep 22, 2021
de86aa2
rds unit tests
ericl Sep 22, 2021
42429ce
update
ericl Sep 22, 2021
19d8314
update
ericl Sep 22, 2021
10666e8
lint
ericl Sep 22, 2021
1c47b62
indent
ericl Sep 22, 2021
3458423
lint
ericl Sep 22, 2021
9018a21
wip
ericl Sep 23, 2021
3deb2cd
remove
ericl Sep 23, 2021
ee4b491
wip
ericl Sep 23, 2021
987601b
wip
ericl Sep 23, 2021
8dd1975
wip
ericl Sep 23, 2021
9d9a1b2
wi
ericl Sep 23, 2021
15e4842
fix
ericl Sep 23, 2021
e0cbd68
fix
ericl Sep 24, 2021
9d10efe
update
ericl Sep 24, 2021
dcbb7c3
lint
ericl Sep 24, 2021
b4e5652
Merge remote-tracking branch 'upstream/master' into local-autoscaler
ericl Sep 24, 2021
e965925
fixc
ericl Sep 24, 2021
4ea61ac
remove
ericl Sep 24, 2021
bd20dc6
fix
ericl Sep 24, 2021
aa935d7
fix copy issue
ericl Sep 24, 2021
c28e8ea
update
ericl Sep 24, 2021
bf7d3f9
update
ericl Sep 24, 2021
4c7e940
Merge remote-tracking branch 'upstream/master' into local-autoscaler
ericl Sep 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 87 additions & 38 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections import defaultdict, namedtuple, Counter
from typing import Callable, Any, Optional, Dict, List, Set, FrozenSet, Tuple
from typing import Any, Optional, Dict, List, Set, FrozenSet, Tuple, Union, \
Callable
import copy
import logging
import math
Expand Down Expand Up @@ -83,7 +84,7 @@ class StandardAutoscaler:

def __init__(
self,
config_path: str,
config_reader: Union[str, Callable[[], dict]],
load_metrics: LoadMetrics,
max_launch_batch: int = AUTOSCALER_MAX_LAUNCH_BATCH,
max_concurrent_launches: int = AUTOSCALER_MAX_CONCURRENT_LAUNCHES,
Expand All @@ -96,22 +97,33 @@ def __init__(
"""Create a StandardAutoscaler.

Args:
config_path: Path to a Ray Autoscaler YAML.
load_metrics: Provides metrics for the Ray cluster.
max_launch_batch: Max number of nodes to launch in one request.
max_concurrent_launches: Max number of nodes that can be concurrently
launched. This value and `max_launch_batch` determine the number
of batches that are used to launch nodes.
max_failures: Number of failures that the autoscaler will tolerate
before exiting.
process_runner: Subprocess-like interface used by the CommandRunner.
update_interval_s: Seconds between running the autoscaling loop.
prefix_cluster_info: Whether to add the cluster name to info strings.
event_summarizer: Utility to consolidate duplicated messages.
prom_metrics: Prometheus metrics for autoscaler-related operations.
config_reader: Path to a Ray Autoscaler YAML, or a function to read
and return the latest config.
load_metrics: Provides metrics for the Ray cluster.
max_launch_batch: Max number of nodes to launch in one request.
max_concurrent_launches: Max number of nodes that can be
concurrently launched. This value and `max_launch_batch`
determine the number of batches that are used to launch nodes.
max_failures: Number of failures that the autoscaler will tolerate
before exiting.
process_runner: Subproc-like interface used by the CommandRunner.
update_interval_s: Seconds between running the autoscaling loop.
prefix_cluster_info: Whether to add the cluster name to info strs.
event_summarizer: Utility to consolidate duplicated messages.
prom_metrics: Prometheus metrics for autoscaler-related operations.
"""

self.config_path = config_path
if isinstance(config_reader, str):
# Auto wrap with file reader.
def read_fn():
with open(config_reader) as f:
new_config = yaml.safe_load(f.read())
return new_config

self.config_reader = read_fn
else:
self.config_reader = config_reader

# Prefix each line of info string with cluster name if True
self.prefix_cluster_info = prefix_cluster_info
# Keep this before self.reset (self.provider needs to be created
Expand Down Expand Up @@ -218,17 +230,31 @@ def _update(self):
self.provider.internal_ip(node_id) for node_id in self.all_workers
])

self.terminate_nodes_to_enforce_config_constraints(now)

self.launch_required_nodes()

if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()
if not self.provider.is_readonly():
self.terminate_nodes_to_enforce_config_constraints(now)

to_launch, unfulfilled = (
self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.load_metrics.
get_resource_requests()))
self._report_pending_infeasible(unfulfilled)

if not self.provider.is_readonly():
self.launch_required_nodes(to_launch)

if self.disable_node_updaters:
self.terminate_unhealthy_nodes(now)
else:
self.process_completed_updates()
self.update_nodes()
self.attempt_to_recover_unhealthy_nodes(now)
self.set_prometheus_updater_data()

logger.info(self.info_string())
legacy_log_info_string(self, self.workers)
Expand Down Expand Up @@ -354,15 +380,7 @@ def terminate_scheduled_nodes(self):
self.nodes_to_terminate = []
self.update_worker_list()

def launch_required_nodes(self):
to_launch = self.resource_demand_scheduler.get_nodes_to_launch(
self.provider.non_terminated_nodes(tag_filters={}),
self.pending_launches.breakdown(),
self.load_metrics.get_resource_demand_vector(),
self.load_metrics.get_resource_utilization(),
self.load_metrics.get_pending_placement_groups(),
self.load_metrics.get_static_node_resources_by_ip(),
ensure_min_cluster_size=self.load_metrics.get_resource_requests())
def launch_required_nodes(self, to_launch):
if to_launch:
for node_type, count in to_launch.items():
self.launch_new_node(count, node_type=node_type)
Expand Down Expand Up @@ -452,6 +470,38 @@ def set_prometheus_updater_data(self):
num_recovering += 1
self.prom_metrics.recovering_nodes.set(num_recovering)

def _report_pending_infeasible(self, resources: List[ResourceDict]):
"""Emit event messages for infeasible or unschedulable tasks."""
pending = []
infeasible = []
for bundle in resources:
placement_group = any("_group_" in k for k in bundle)
if placement_group:
continue
if self.resource_demand_scheduler.is_feasible(bundle):
pending.append(bundle)
else:
infeasible.append(bundle)
if pending:
if self.load_metrics.cluster_full_reported:
for request in pending:
self.event_summarizer.add_once_per_interval(
"Warning: The following resource request cannot be "
"scheduled right now: {}. This is likely due to all "
"cluster resources being claimed by actors. Consider "
"creating fewer actors or adding more nodes "
"to this Ray cluster.".format(request),
key="pending_{}".format(sorted(request.items())),
interval_s=30)
if infeasible:
for request in infeasible:
self.event_summarizer.add_once_per_interval(
"Error: No available node types can fulfill resource "
"request {}. Add suitable node types to this cluster to "
"resolve this issue.".format(request),
key="infeasible_{}".format(sorted(request.items())),
interval_s=30)

def _sort_based_on_last_used(self, nodes: List[NodeID],
last_used: Dict[str, float]) -> List[NodeID]:
"""Sort the nodes based on the last time they were used.
Expand Down Expand Up @@ -615,8 +665,7 @@ def reset(self, errors_fatal=False):
sync_continuously = self.config.get(
"file_mounts_sync_continuously", False)
try:
with open(self.config_path) as f:
new_config = yaml.safe_load(f.read())
new_config = self.config_reader()
if new_config != getattr(self, "config", None):
try:
validate_config(new_config)
Expand Down
12 changes: 3 additions & 9 deletions python/ray/autoscaler/_private/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@
import ray.autoscaler._private.subprocess_output_util as cmd_output_util
from ray.autoscaler._private.load_metrics import LoadMetricsSummary
from ray.autoscaler._private.autoscaler import AutoscalerSummary
from ray.autoscaler._private.util import format_info_string, \
format_info_string_no_node_types
from ray.autoscaler._private.util import format_info_string

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,13 +108,8 @@ def debug_status(status, error) -> str:
as_dict = json.loads(status)
time = datetime.datetime.fromtimestamp(as_dict["time"])
lm_summary = LoadMetricsSummary(**as_dict["load_metrics_report"])
if "autoscaler_report" in as_dict:
autoscaler_summary = AutoscalerSummary(
**as_dict["autoscaler_report"])
status = format_info_string(
lm_summary, autoscaler_summary, time=time)
else:
status = format_info_string_no_node_types(lm_summary, time=time)
autoscaler_summary = AutoscalerSummary(**as_dict["autoscaler_report"])
status = format_info_string(lm_summary, autoscaler_summary, time=time)
if error:
status += "\n"
status += error.decode("utf-8")
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def env_integer(key, default):


# Whether event logging to driver is enabled. Set to 0 to disable.
AUTOSCALER_EVENTS = env_integer("AUTOSCALER_EVENTS", 1)
AUTOSCALER_EVENTS = env_integer("RAY_SCHEDULER_EVENTS", 1)

# Whether to avoid launching GPU nodes for CPU only tasks.
AUTOSCALER_CONSERVE_GPU_NODES = env_integer("AUTOSCALER_CONSERVE_GPU_NODES", 1)
Expand Down
20 changes: 20 additions & 0 deletions python/ray/autoscaler/_private/event_summarizer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from typing import Any, Callable, Dict, List
import time


class EventSummarizer:
"""Utility that aggregates related log messages to reduce log spam."""

def __init__(self):
self.events_by_key: Dict[str, int] = {}
self.messages: List[str] = []
self.key_ttl: Dict[str, float] = {}

def add(self, template: str, *, quantity: Any,
aggregate: Callable[[Any, Any], Any]) -> None:
Expand All @@ -27,13 +30,30 @@ def add(self, template: str, *, quantity: Any,
else:
self.events_by_key[template] = quantity

def add_once_per_interval(self, message: str, key: str, interval_s: int):
"""Add a log message, which is throttled once per interval by a key.

Args:
message (str): The message to log.
key (str): The key to use to deduplicate the message.
interval_s (int): Throttling interval in seconds.
"""
if key not in self.key_ttl:
self.key_ttl[key] = time.time() + interval_s
self.messages.append(message)

def summary(self) -> List[str]:
"""Generate the aggregated log summary of all added events."""
out = []
for template, quantity in self.events_by_key.items():
out.append(template.format(quantity))
out.extend(self.messages)
return out

def clear(self) -> None:
"""Clear the events added."""
self.events_by_key.clear()
self.messages.clear()
for k, t in list(self.key_ttl.items()):
if time.time() > t:
del self.key_ttl[k]
5 changes: 4 additions & 1 deletion python/ray/autoscaler/_private/load_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, local_ip=None):
self.infeasible_bundles = []
self.pending_placement_groups = []
self.resource_requests = []
self.cluster_full_reported = False

def update(self,
ip: str,
Expand All @@ -91,9 +92,11 @@ def update(self,
resource_load: Dict[str, Dict],
waiting_bundles: List[Dict[str, float]] = None,
infeasible_bundles: List[Dict[str, float]] = None,
pending_placement_groups: List[PlacementGroupTableData] = None):
pending_placement_groups: List[PlacementGroupTableData] = None,
cluster_full_reported: bool = False):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources
self.cluster_full_reported = cluster_full_reported

if not waiting_bundles:
waiting_bundles = []
Expand Down
Loading