Minimal version of piping autoscaler events to driver logs (#13434)

This commit is contained in:
Eric Liang
2021-01-16 10:06:20 -08:00
committed by GitHub
parent 7e54911093
commit 8c8af2616e
13 changed files with 274 additions and 30 deletions
+41 -1
View File
@@ -4,6 +4,7 @@ from urllib3.exceptions import MaxRetryError
import copy
import logging
import math
import operator
import os
import subprocess
import threading
@@ -19,6 +20,7 @@ from ray.autoscaler.tags import (
TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH,
STATUS_SYNCING_FILES, STATUS_SETTING_UP, STATUS_UP_TO_DATE,
NODE_KIND_WORKER, NODE_KIND_UNMANAGED, NODE_KIND_HEAD)
from ray.autoscaler._private.event_summarizer import EventSummarizer
from ray.autoscaler._private.legacy_info_string import legacy_log_info_string
from ray.autoscaler._private.providers import _get_node_provider
from ray.autoscaler._private.updater import NodeUpdaterThread
@@ -73,7 +75,8 @@ class StandardAutoscaler:
max_failures=AUTOSCALER_MAX_NUM_FAILURES,
process_runner=subprocess,
update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S,
prefix_cluster_info=False):
prefix_cluster_info=False,
event_summarizer=None):
self.config_path = config_path
# Prefix each line of info string with cluster name if True
self.prefix_cluster_info = prefix_cluster_info
@@ -89,6 +92,7 @@ class StandardAutoscaler:
self.max_launch_batch = max_launch_batch
self.max_concurrent_launches = max_concurrent_launches
self.process_runner = process_runner
self.event_summarizer = event_summarizer or EventSummarizer()
# Map from node_id to NodeUpdater processes
self.updaters = {}
@@ -193,10 +197,20 @@ class StandardAutoscaler:
if node_ip in last_used and last_used[node_ip] < horizon:
logger.info("StandardAutoscaler: "
"{}: Terminating idle node.".format(node_id))
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id)
+ " (idle).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
elif not self.launch_config_ok(node_id):
logger.info("StandardAutoscaler: "
"{}: Terminating outdated node.".format(node_id))
self.event_summarizer.add(
"Removing {} nodes of type " + self._get_node_type(node_id)
+ " (outdated).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
if nodes_to_terminate:
@@ -210,6 +224,11 @@ class StandardAutoscaler:
to_terminate = nodes.pop()
logger.info("StandardAutoscaler: "
"{}: Terminating unneeded node.".format(to_terminate))
self.event_summarizer.add(
"Removing {} nodes of type " +
self._get_node_type(to_terminate) + " (max workers).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(to_terminate)
if nodes_to_terminate:
@@ -246,6 +265,11 @@ class StandardAutoscaler:
else:
logger.error(f"StandardAutoscaler: {node_id}: Terminating "
"failed to setup/initialize node.")
self.event_summarizer.add(
"Removing {} nodes of type " +
self._get_node_type(node_id) + " (launch failed).",
quantity=1,
aggregate=operator.add)
nodes_to_terminate.append(node_id)
self.num_failed_updates[node_id] += 1
del self.updaters[node_id]
@@ -544,6 +568,11 @@ class StandardAutoscaler:
logger.warning("StandardAutoscaler: "
"{}: No recent heartbeat, "
"restarting Ray to recover...".format(node_id))
self.event_summarizer.add(
"Restarting {} nodes of type " + self._get_node_type(node_id) +
" (lost contact with raylet).",
quantity=1,
aggregate=operator.add)
updater = NodeUpdaterThread(
node_id=node_id,
provider_config=self.config["provider"],
@@ -565,6 +594,13 @@ class StandardAutoscaler:
updater.start()
self.updaters[node_id] = updater
def _get_node_type(self, node_id: str) -> str:
node_tags = self.provider.node_tags(node_id)
if TAG_RAY_USER_NODE_TYPE in node_tags:
return node_tags[TAG_RAY_USER_NODE_TYPE]
else:
return "unknown"
def _get_node_type_specific_fields(self, node_id: str,
fields_key: str) -> Any:
fields = self.config[fields_key]
@@ -661,6 +697,10 @@ class StandardAutoscaler:
def launch_new_node(self, count: int, node_type: Optional[str]) -> None:
logger.info(
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
self.event_summarizer.add(
"Adding {} nodes of type " + str(node_type) + ".",
quantity=count,
aggregate=operator.add)
self.pending_launches.inc(node_type, count)
config = copy.deepcopy(self.config)
# Split into individual launch requests of the max batch size.
+4 -2
View File
@@ -131,8 +131,10 @@ def request_resources(num_cpus: Optional[int] = None,
to_request += [{"CPU": 1}] * num_cpus
if bundles:
to_request += bundles
_internal_kv_put(AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
json.dumps(to_request))
_internal_kv_put(
AUTOSCALER_RESOURCE_REQUEST_CHANNEL,
json.dumps(to_request),
overwrite=True)
def create_or_update_cluster(config_file: str,
@@ -12,6 +12,9 @@ def env_integer(key, default):
return default
# Whether event logging to driver is enabled. Set to 0 to disable.
AUTOSCALER_EVENTS = env_integer("AUTOSCALER_EVENTS", 1)
# How long to wait for a node to start, in seconds
NODE_START_WAIT_S = env_integer("AUTOSCALER_NODE_START_WAIT_S", 900)
@@ -0,0 +1,39 @@
from typing import Any, Callable, Dict, List
class EventSummarizer:
"""Utility that aggregates related log messages to reduce log spam."""
def __init__(self):
self.events_by_key: Dict[str, int] = {}
def add(self, template: str, *, quantity: Any,
aggregate: Callable[[Any, Any], Any]) -> None:
"""Add a log message, which will be combined by template.
Args:
template (str): Format string with one placeholder for quantity.
quantity (Any): Quantity to aggregate.
aggregate (func): Aggregation function used to combine the
quantities. The result is inserted into the template to
produce the final log message.
"""
# Enforce proper sentence structure.
if not template.endswith("."):
template += "."
if template in self.events_by_key:
self.events_by_key[template] = aggregate(
self.events_by_key[template], quantity)
else:
self.events_by_key[template] = quantity
def summary(self) -> List[str]:
"""Generate the aggregated log summary of all added events."""
out = []
for template, quantity in self.events_by_key.items():
out.append(template.format(quantity))
return out
def clear(self) -> None:
"""Clear the events added."""
self.events_by_key.clear()
@@ -190,6 +190,19 @@ class LoadMetrics:
def get_pending_placement_groups(self):
return self.pending_placement_groups
def resources_avail_summary(self) -> str:
"""Return a concise string of cluster size to report to event logs.
For example, "3 CPUs, 4 GPUs".
"""
total_resources = reduce(add_resources,
self.static_resources_by_ip.values()
) if self.static_resources_by_ip else {}
out = "{} CPUs".format(int(total_resources.get("CPU", 0)))
if "GPU" in total_resources:
out += ", {} GPUs".format(int(total_resources["GPU"]))
return out
def summary(self):
available_resources = reduce(add_resources,
self.dynamic_resources_by_ip.values()
+3 -1
View File
@@ -311,9 +311,11 @@ def format_pg(pg):
return f"{bundles_str} ({strategy})"
def get_usage_report(lm_summary):
def get_usage_report(lm_summary) -> str:
usage_lines = []
for resource, (used, total) in lm_summary.usage.items():
if "node:" in resource:
continue # Skip the auto-added per-node "node:<ip>" resource.
line = f" {used}/{total} {resource}"
if resource in ["memory", "object_store_memory"]:
to_GiB = ray.ray_constants.MEMORY_RESOURCE_UNIT_BYTES / 2**30
+1 -1
View File
@@ -20,7 +20,7 @@ def _internal_kv_get(key: Union[str, bytes]) -> bytes:
@client_mode_hook
def _internal_kv_put(key: Union[str, bytes],
value: Union[str, bytes],
overwrite: bool = False) -> bool:
overwrite: bool = True) -> bool:
"""Globally associates a value with a given binary key.
This only has an effect if the key does not already have a value.
+6 -1
View File
@@ -125,9 +125,12 @@ class LogMonitor:
log_file_paths = glob.glob(f"{self.logs_dir}/worker*[.out|.err]")
# segfaults and other serious errors are logged here
raylet_err_paths = glob.glob(f"{self.logs_dir}/raylet*.err")
# monitor logs are needed to report autoscaler events
monitor_log_paths = glob.glob(f"{self.logs_dir}/monitor.log")
# If gcs server restarts, there can be multiple log files.
gcs_err_path = glob.glob(f"{self.logs_dir}/gcs_server*.err")
for file_path in log_file_paths + raylet_err_paths + gcs_err_path:
for file_path in (log_file_paths + raylet_err_paths + gcs_err_path +
monitor_log_paths):
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
job_match = JOB_LOG_PATTERN.match(file_path)
@@ -246,6 +249,8 @@ class LogMonitor:
file_info.worker_pid = "raylet"
elif "/gcs_server" in file_info.filename:
file_info.worker_pid = "gcs_server"
elif "/monitor" in file_info.filename:
file_info.worker_pid = "autoscaler"
# Record the current position in the file.
file_info.file_position = file_info.file_handle.tell()
+27 -15
View File
@@ -12,6 +12,7 @@ import ray
from ray.autoscaler._private.autoscaler import StandardAutoscaler
from ray.autoscaler._private.commands import teardown_cluster
from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S
from ray.autoscaler._private.event_summarizer import EventSummarizer
from ray.autoscaler._private.load_metrics import LoadMetrics
from ray.autoscaler._private.constants import \
AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE
@@ -101,16 +102,21 @@ class Monitor:
self.raylet_id_to_ip_map = {}
head_node_ip = redis_address.split(":")[0]
self.load_metrics = LoadMetrics(local_ip=head_node_ip)
self.last_avail_resources = None
self.event_summarizer = EventSummarizer()
if autoscaling_config:
self.autoscaler = StandardAutoscaler(
autoscaling_config,
self.load_metrics,
prefix_cluster_info=prefix_cluster_info)
prefix_cluster_info=prefix_cluster_info,
event_summarizer=self.event_summarizer)
self.autoscaling_config = autoscaling_config
else:
self.autoscaler = None
self.autoscaling_config = None
logger.info("Monitor: Started")
def __del__(self):
"""Destruct the monitor object."""
# We close the pubsub client to avoid leaking file descriptors.
@@ -160,20 +166,6 @@ class Monitor:
except Exception:
logger.exception("Error parsing resource requests")
def autoscaler_resource_request_handler(self, _, data):
"""Handle a notification of a resource request for the autoscaler.
This channel and method are only used by the manual
`ray.autoscaler.sdk.request_resources` api.
Args:
channel: unused
data: a resource request as JSON, e.g. {"CPU": 1}
"""
resource_request = json.loads(data)
self.load_metrics.set_resource_requests(resource_request)
def update_raylet_map(self, _append_port=False):
"""Updates internal raylet map.
@@ -199,6 +191,7 @@ class Monitor:
self.update_raylet_map()
self.update_load_metrics()
self.update_resource_requests()
self.update_event_summary()
status = {
"load_metrics_report": self.load_metrics.summary()._asdict()
}
@@ -210,6 +203,10 @@ class Monitor:
status[
"autoscaler_report"] = self.autoscaler.summary()._asdict()
for msg in self.event_summarizer.summary():
logger.info(":event_summary:{}".format(msg))
self.event_summarizer.clear()
as_json = json.dumps(status)
if _internal_kv_initialized():
_internal_kv_put(
@@ -219,6 +216,21 @@ class Monitor:
# round of messages.
time.sleep(AUTOSCALER_UPDATE_INTERVAL_S)
def update_event_summary(self):
"""Report the current size of the cluster.
To avoid log spam, only cluster size changes (CPU or GPU count change)
are reported to the event summarizer. The event summarizer will report
only the latest cluster size per batch.
"""
avail_resources = self.load_metrics.resources_avail_summary()
if avail_resources != self.last_avail_resources:
self.event_summarizer.add(
"Resized to {}.", # e.g., Resized to 100 CPUs, 4 GPUs.
quantity=avail_resources,
aggregate=lambda old, new: new)
self.last_avail_resources = avail_resources
def destroy_autoscaler_workers(self):
"""Cleanup the autoscaler, in case of an exception in the run() method.
+33
View File
@@ -735,6 +735,12 @@ class AutoscalingTest(unittest.TestCase):
# Eventually reaches steady state
self.waitForNodes(5)
# Check the outdated node removal event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert ("Removing 10 nodes of type "
"ray-legacy-worker-node-type (outdated)." in events), events
def testDynamicScaling(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
@@ -781,6 +787,12 @@ class AutoscalingTest(unittest.TestCase):
self.waitForNodes(
10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Check the launch failure event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert ("Removing 1 nodes of type "
"ray-legacy-worker-node-type (max workers)." in events), events
def testInitialWorkers(self):
"""initial_workers is deprecated, this tests that it is ignored."""
config = SMALL_CLUSTER.copy()
@@ -1287,6 +1299,13 @@ class AutoscalingTest(unittest.TestCase):
# The failed nodes might have been already terminated by autoscaler
assert len(self.provider.non_terminated_nodes({})) < 2
# Check the launch failure event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert (
"Removing 2 nodes of type "
"ray-legacy-worker-node-type (launch failed)." in events), events
def testConfiguresOutdatedNodes(self):
from ray.autoscaler._private.cli_logger import cli_logger
@@ -1408,6 +1427,13 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER})
# Check add/remove events.
events = autoscaler.event_summarizer.summary()
assert ("Adding 5 nodes of type "
"ray-legacy-worker-node-type." in events), events
assert ("Removing 4 nodes of type "
"ray-legacy-worker-node-type (idle)." in events), events
def testTargetUtilizationFraction(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
@@ -1502,6 +1528,13 @@ class AutoscalingTest(unittest.TestCase):
autoscaler.update()
self.waitFor(lambda: len(runner.calls) > num_calls, num_retries=150)
# Check the node removal event is generated.
autoscaler.update()
events = autoscaler.event_summarizer.summary()
assert ("Restarting 1 nodes of type "
"ray-legacy-worker-node-type (lost contact with raylet)." in
events), events
def testExternalNodeScaler(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
@@ -491,6 +491,12 @@ class AutoscalingPolicyTest(unittest.TestCase):
# TODO (Alex): Not clear what's actually worth asserting here.
assert simulator.node_costs()
# Check event logs contain add/remove node events.
assert any("Adding" in x
for x in simulator.autoscaler.event_summarizer.summary())
assert any("Removing" in x
for x in simulator.autoscaler.event_summarizer.summary())
def testManyActors(self):
config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG)
config_path = self.write_config(config)
@@ -518,6 +524,12 @@ class AutoscalingPolicyTest(unittest.TestCase):
assert time < 650
# Check event logs contain add/remove node events.
assert any("Adding" in x
for x in simulator.autoscaler.event_summarizer.summary())
assert any("Removing" in x
for x in simulator.autoscaler.event_summarizer.summary())
def testManyPlacementGroups(self):
config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG)
config_path = self.write_config(config)
@@ -584,6 +596,12 @@ class AutoscalingPolicyTest(unittest.TestCase):
assert time < 630
# Check event logs contain add/remove node events.
assert any("Adding" in x
for x in simulator.autoscaler.event_summarizer.summary())
assert any("Removing" in x
for x in simulator.autoscaler.event_summarizer.summary())
if __name__ == "__main__":
import sys
+7
View File
@@ -75,9 +75,13 @@ def setup_monitor(address):
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
request_resources(num_cpus=42)
# Disable event clearing for test.
monitor.event_summarizer.clear = lambda *a: None
while True:
monitor.update_load_metrics()
monitor.update_resource_requests()
monitor.update_event_summary()
resource_usage = monitor.load_metrics._get_resource_usage()
# Check resource request propagation.
@@ -113,6 +117,9 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
raise ValueError("Timeout. {} != {}".format(
resource_usage, expected_resource_usage))
# Sanity check we emitted a resize event.
assert any("Resized to" in x for x in monitor.event_summarizer.summary())
return resource_usage
+79 -9
View File
@@ -14,8 +14,10 @@ import sys
import threading
import time
import traceback
from typing import Any, Dict, List, Iterator
# Ray modules
from ray.autoscaler._private.constants import AUTOSCALER_EVENTS
import ray.cloudpickle as pickle
import ray.gcs_utils
import ray.memory_monitor as memory_monitor
@@ -925,25 +927,93 @@ def print_to_stdstream(data):
print_worker_logs(data, print_file)
def print_worker_logs(data, print_file):
def color_for(data):
# Start time of this process, used for relative time logs.
t0 = time.time()
autoscaler_log_fyi_printed = False
def filter_autoscaler_events(lines: List[str]) -> Iterator[str]:
"""Given raw log lines from the monitor, return only autoscaler events.
Autoscaler events are denoted by the ":event_summary:" magic token.
"""
global autoscaler_log_fyi_printed
if not AUTOSCALER_EVENTS:
return
# Print out autoscaler events only, ignoring other messages.
for line in lines:
if ":event_summary:" in line:
if not autoscaler_log_fyi_printed:
yield ("Tip: use `ray status` to view detailed "
"autoscaling status. To disable autoscaler event "
"messages, you can set AUTOSCALER_EVENTS=0.")
autoscaler_log_fyi_printed = True
# The event text immediately follows the ":event_summary:"
# magic token.
yield line.split(":event_summary:")[1]
def time_string() -> str:
"""Return the relative time from the start of this job.
For example, 15m30s.
"""
delta = time.time() - t0
hours = 0
minutes = 0
while delta > 3600:
hours += 1
delta -= 3600
while delta > 60:
minutes += 1
delta -= 60
output = ""
if hours:
output += "{}h".format(hours)
if minutes:
output += "{}m".format(minutes)
output += "{}s".format(int(delta))
return output
def print_worker_logs(data: Dict[str, str], print_file: Any):
def prefix_for(data: Dict[str, str]) -> str:
"""The PID prefix for this log line."""
if data["pid"] in ["autoscaler", "raylet"]:
return ""
else:
return "pid="
def color_for(data: Dict[str, str]) -> str:
"""The color for this log line."""
if data["pid"] == "raylet":
return colorama.Fore.YELLOW
elif data["pid"] == "autoscaler":
return colorama.Style.BRIGHT + colorama.Fore.CYAN
else:
return colorama.Fore.CYAN
if data["pid"] == "autoscaler":
pid = "{} +{}".format(data["pid"], time_string())
lines = filter_autoscaler_events(data["lines"])
else:
pid = data["pid"]
lines = data["lines"]
if data["ip"] == data["localhost"]:
for line in data["lines"]:
for line in lines:
print(
"{}{}(pid={}){} {}".format(colorama.Style.DIM, color_for(data),
data["pid"],
colorama.Style.RESET_ALL, line),
"{}{}({}{}){} {}".format(colorama.Style.DIM, color_for(data),
prefix_for(data), pid,
colorama.Style.RESET_ALL, line),
file=print_file)
else:
for line in data["lines"]:
for line in lines:
print(
"{}{}(pid={}, ip={}){} {}".format(
colorama.Style.DIM, color_for(data), data["pid"],
"{}{}({}{}, ip={}){} {}".format(
colorama.Style.DIM, color_for(data), prefix_for(data), pid,
data["ip"], colorama.Style.RESET_ALL, line),
file=print_file)