From 8c8af2616e70ce66c9ebb5094b85c9cd869ef6c6 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 16 Jan 2021 10:06:20 -0800 Subject: [PATCH] Minimal version of piping autoscaler events to driver logs (#13434) --- python/ray/autoscaler/_private/autoscaler.py | 42 ++++++++- python/ray/autoscaler/_private/commands.py | 6 +- python/ray/autoscaler/_private/constants.py | 3 + .../autoscaler/_private/event_summarizer.py | 39 ++++++++ .../ray/autoscaler/_private/load_metrics.py | 13 +++ python/ray/autoscaler/_private/util.py | 4 +- python/ray/experimental/internal_kv.py | 2 +- python/ray/log_monitor.py | 7 +- python/ray/monitor.py | 42 +++++---- python/ray/tests/test_autoscaler.py | 33 +++++++ python/ray/tests/test_autoscaling_policy.py | 18 ++++ python/ray/tests/test_multi_node_2.py | 7 ++ python/ray/worker.py | 88 +++++++++++++++++-- 13 files changed, 274 insertions(+), 30 deletions(-) create mode 100644 python/ray/autoscaler/_private/event_summarizer.py diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index 56c8fa634..2838e24c1 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -4,6 +4,7 @@ from urllib3.exceptions import MaxRetryError import copy import logging import math +import operator import os import subprocess import threading @@ -19,6 +20,7 @@ from ray.autoscaler.tags import ( TAG_RAY_USER_NODE_TYPE, STATUS_UNINITIALIZED, STATUS_WAITING_FOR_SSH, STATUS_SYNCING_FILES, STATUS_SETTING_UP, STATUS_UP_TO_DATE, NODE_KIND_WORKER, NODE_KIND_UNMANAGED, NODE_KIND_HEAD) +from ray.autoscaler._private.event_summarizer import EventSummarizer from ray.autoscaler._private.legacy_info_string import legacy_log_info_string from ray.autoscaler._private.providers import _get_node_provider from ray.autoscaler._private.updater import NodeUpdaterThread @@ -73,7 +75,8 @@ class StandardAutoscaler: max_failures=AUTOSCALER_MAX_NUM_FAILURES, process_runner=subprocess, update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S, - prefix_cluster_info=False): + prefix_cluster_info=False, + event_summarizer=None): self.config_path = config_path # Prefix each line of info string with cluster name if True self.prefix_cluster_info = prefix_cluster_info @@ -89,6 +92,7 @@ class StandardAutoscaler: self.max_launch_batch = max_launch_batch self.max_concurrent_launches = max_concurrent_launches self.process_runner = process_runner + self.event_summarizer = event_summarizer or EventSummarizer() # Map from node_id to NodeUpdater processes self.updaters = {} @@ -193,10 +197,20 @@ class StandardAutoscaler: if node_ip in last_used and last_used[node_ip] < horizon: logger.info("StandardAutoscaler: " "{}: Terminating idle node.".format(node_id)) + self.event_summarizer.add( + "Removing {} nodes of type " + self._get_node_type(node_id) + + " (idle).", + quantity=1, + aggregate=operator.add) nodes_to_terminate.append(node_id) elif not self.launch_config_ok(node_id): logger.info("StandardAutoscaler: " "{}: Terminating outdated node.".format(node_id)) + self.event_summarizer.add( + "Removing {} nodes of type " + self._get_node_type(node_id) + + " (outdated).", + quantity=1, + aggregate=operator.add) nodes_to_terminate.append(node_id) if nodes_to_terminate: @@ -210,6 +224,11 @@ class StandardAutoscaler: to_terminate = nodes.pop() logger.info("StandardAutoscaler: " "{}: Terminating unneeded node.".format(to_terminate)) + self.event_summarizer.add( + "Removing {} nodes of type " + + self._get_node_type(to_terminate) + " (max workers).", + quantity=1, + aggregate=operator.add) nodes_to_terminate.append(to_terminate) if nodes_to_terminate: @@ -246,6 +265,11 @@ class StandardAutoscaler: else: logger.error(f"StandardAutoscaler: {node_id}: Terminating " "failed to setup/initialize node.") + self.event_summarizer.add( + "Removing {} nodes of type " + + self._get_node_type(node_id) + " (launch failed).", + quantity=1, + aggregate=operator.add) nodes_to_terminate.append(node_id) self.num_failed_updates[node_id] += 1 del self.updaters[node_id] @@ -544,6 +568,11 @@ class StandardAutoscaler: logger.warning("StandardAutoscaler: " "{}: No recent heartbeat, " "restarting Ray to recover...".format(node_id)) + self.event_summarizer.add( + "Restarting {} nodes of type " + self._get_node_type(node_id) + + " (lost contact with raylet).", + quantity=1, + aggregate=operator.add) updater = NodeUpdaterThread( node_id=node_id, provider_config=self.config["provider"], @@ -565,6 +594,13 @@ class StandardAutoscaler: updater.start() self.updaters[node_id] = updater + def _get_node_type(self, node_id: str) -> str: + node_tags = self.provider.node_tags(node_id) + if TAG_RAY_USER_NODE_TYPE in node_tags: + return node_tags[TAG_RAY_USER_NODE_TYPE] + else: + return "unknown" + def _get_node_type_specific_fields(self, node_id: str, fields_key: str) -> Any: fields = self.config[fields_key] @@ -661,6 +697,10 @@ class StandardAutoscaler: def launch_new_node(self, count: int, node_type: Optional[str]) -> None: logger.info( "StandardAutoscaler: Queue {} new nodes for launch".format(count)) + self.event_summarizer.add( + "Adding {} nodes of type " + str(node_type) + ".", + quantity=count, + aggregate=operator.add) self.pending_launches.inc(node_type, count) config = copy.deepcopy(self.config) # Split into individual launch requests of the max batch size. diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 5647b717a..df0a10449 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -131,8 +131,10 @@ def request_resources(num_cpus: Optional[int] = None, to_request += [{"CPU": 1}] * num_cpus if bundles: to_request += bundles - _internal_kv_put(AUTOSCALER_RESOURCE_REQUEST_CHANNEL, - json.dumps(to_request)) + _internal_kv_put( + AUTOSCALER_RESOURCE_REQUEST_CHANNEL, + json.dumps(to_request), + overwrite=True) def create_or_update_cluster(config_file: str, diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index ac0e97124..3fd3ec65e 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -12,6 +12,9 @@ def env_integer(key, default): return default +# Whether event logging to driver is enabled. Set to 0 to disable. +AUTOSCALER_EVENTS = env_integer("AUTOSCALER_EVENTS", 1) + # How long to wait for a node to start, in seconds NODE_START_WAIT_S = env_integer("AUTOSCALER_NODE_START_WAIT_S", 900) diff --git a/python/ray/autoscaler/_private/event_summarizer.py b/python/ray/autoscaler/_private/event_summarizer.py new file mode 100644 index 000000000..d877bf424 --- /dev/null +++ b/python/ray/autoscaler/_private/event_summarizer.py @@ -0,0 +1,39 @@ +from typing import Any, Callable, Dict, List + + +class EventSummarizer: + """Utility that aggregates related log messages to reduce log spam.""" + + def __init__(self): + self.events_by_key: Dict[str, int] = {} + + def add(self, template: str, *, quantity: Any, + aggregate: Callable[[Any, Any], Any]) -> None: + """Add a log message, which will be combined by template. + + Args: + template (str): Format string with one placeholder for quantity. + quantity (Any): Quantity to aggregate. + aggregate (func): Aggregation function used to combine the + quantities. The result is inserted into the template to + produce the final log message. + """ + # Enforce proper sentence structure. + if not template.endswith("."): + template += "." + if template in self.events_by_key: + self.events_by_key[template] = aggregate( + self.events_by_key[template], quantity) + else: + self.events_by_key[template] = quantity + + def summary(self) -> List[str]: + """Generate the aggregated log summary of all added events.""" + out = [] + for template, quantity in self.events_by_key.items(): + out.append(template.format(quantity)) + return out + + def clear(self) -> None: + """Clear the events added.""" + self.events_by_key.clear() diff --git a/python/ray/autoscaler/_private/load_metrics.py b/python/ray/autoscaler/_private/load_metrics.py index dc3178015..bf9dc564b 100644 --- a/python/ray/autoscaler/_private/load_metrics.py +++ b/python/ray/autoscaler/_private/load_metrics.py @@ -190,6 +190,19 @@ class LoadMetrics: def get_pending_placement_groups(self): return self.pending_placement_groups + def resources_avail_summary(self) -> str: + """Return a concise string of cluster size to report to event logs. + + For example, "3 CPUs, 4 GPUs". + """ + total_resources = reduce(add_resources, + self.static_resources_by_ip.values() + ) if self.static_resources_by_ip else {} + out = "{} CPUs".format(int(total_resources.get("CPU", 0))) + if "GPU" in total_resources: + out += ", {} GPUs".format(int(total_resources["GPU"])) + return out + def summary(self): available_resources = reduce(add_resources, self.dynamic_resources_by_ip.values() diff --git a/python/ray/autoscaler/_private/util.py b/python/ray/autoscaler/_private/util.py index 1ab7c2e68..81a2c1fc0 100644 --- a/python/ray/autoscaler/_private/util.py +++ b/python/ray/autoscaler/_private/util.py @@ -311,9 +311,11 @@ def format_pg(pg): return f"{bundles_str} ({strategy})" -def get_usage_report(lm_summary): +def get_usage_report(lm_summary) -> str: usage_lines = [] for resource, (used, total) in lm_summary.usage.items(): + if "node:" in resource: + continue # Skip the auto-added per-node "node:" resource. line = f" {used}/{total} {resource}" if resource in ["memory", "object_store_memory"]: to_GiB = ray.ray_constants.MEMORY_RESOURCE_UNIT_BYTES / 2**30 diff --git a/python/ray/experimental/internal_kv.py b/python/ray/experimental/internal_kv.py index 388d06037..deb33902d 100644 --- a/python/ray/experimental/internal_kv.py +++ b/python/ray/experimental/internal_kv.py @@ -20,7 +20,7 @@ def _internal_kv_get(key: Union[str, bytes]) -> bytes: @client_mode_hook def _internal_kv_put(key: Union[str, bytes], value: Union[str, bytes], - overwrite: bool = False) -> bool: + overwrite: bool = True) -> bool: """Globally associates a value with a given binary key. This only has an effect if the key does not already have a value. diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index d6b3a314e..c4e2ca1d2 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -125,9 +125,12 @@ class LogMonitor: log_file_paths = glob.glob(f"{self.logs_dir}/worker*[.out|.err]") # segfaults and other serious errors are logged here raylet_err_paths = glob.glob(f"{self.logs_dir}/raylet*.err") + # monitor logs are needed to report autoscaler events + monitor_log_paths = glob.glob(f"{self.logs_dir}/monitor.log") # If gcs server restarts, there can be multiple log files. gcs_err_path = glob.glob(f"{self.logs_dir}/gcs_server*.err") - for file_path in log_file_paths + raylet_err_paths + gcs_err_path: + for file_path in (log_file_paths + raylet_err_paths + gcs_err_path + + monitor_log_paths): if os.path.isfile( file_path) and file_path not in self.log_filenames: job_match = JOB_LOG_PATTERN.match(file_path) @@ -246,6 +249,8 @@ class LogMonitor: file_info.worker_pid = "raylet" elif "/gcs_server" in file_info.filename: file_info.worker_pid = "gcs_server" + elif "/monitor" in file_info.filename: + file_info.worker_pid = "autoscaler" # Record the current position in the file. file_info.file_position = file_info.file_handle.tell() diff --git a/python/ray/monitor.py b/python/ray/monitor.py index a586cc69d..fe1edad63 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -12,6 +12,7 @@ import ray from ray.autoscaler._private.autoscaler import StandardAutoscaler from ray.autoscaler._private.commands import teardown_cluster from ray.autoscaler._private.constants import AUTOSCALER_UPDATE_INTERVAL_S +from ray.autoscaler._private.event_summarizer import EventSummarizer from ray.autoscaler._private.load_metrics import LoadMetrics from ray.autoscaler._private.constants import \ AUTOSCALER_MAX_RESOURCE_DEMAND_VECTOR_SIZE @@ -101,16 +102,21 @@ class Monitor: self.raylet_id_to_ip_map = {} head_node_ip = redis_address.split(":")[0] self.load_metrics = LoadMetrics(local_ip=head_node_ip) + self.last_avail_resources = None + self.event_summarizer = EventSummarizer() if autoscaling_config: self.autoscaler = StandardAutoscaler( autoscaling_config, self.load_metrics, - prefix_cluster_info=prefix_cluster_info) + prefix_cluster_info=prefix_cluster_info, + event_summarizer=self.event_summarizer) self.autoscaling_config = autoscaling_config else: self.autoscaler = None self.autoscaling_config = None + logger.info("Monitor: Started") + def __del__(self): """Destruct the monitor object.""" # We close the pubsub client to avoid leaking file descriptors. @@ -160,20 +166,6 @@ class Monitor: except Exception: logger.exception("Error parsing resource requests") - def autoscaler_resource_request_handler(self, _, data): - """Handle a notification of a resource request for the autoscaler. - - This channel and method are only used by the manual - `ray.autoscaler.sdk.request_resources` api. - - Args: - channel: unused - data: a resource request as JSON, e.g. {"CPU": 1} - """ - - resource_request = json.loads(data) - self.load_metrics.set_resource_requests(resource_request) - def update_raylet_map(self, _append_port=False): """Updates internal raylet map. @@ -199,6 +191,7 @@ class Monitor: self.update_raylet_map() self.update_load_metrics() self.update_resource_requests() + self.update_event_summary() status = { "load_metrics_report": self.load_metrics.summary()._asdict() } @@ -210,6 +203,10 @@ class Monitor: status[ "autoscaler_report"] = self.autoscaler.summary()._asdict() + for msg in self.event_summarizer.summary(): + logger.info(":event_summary:{}".format(msg)) + self.event_summarizer.clear() + as_json = json.dumps(status) if _internal_kv_initialized(): _internal_kv_put( @@ -219,6 +216,21 @@ class Monitor: # round of messages. time.sleep(AUTOSCALER_UPDATE_INTERVAL_S) + def update_event_summary(self): + """Report the current size of the cluster. + + To avoid log spam, only cluster size changes (CPU or GPU count change) + are reported to the event summarizer. The event summarizer will report + only the latest cluster size per batch. + """ + avail_resources = self.load_metrics.resources_avail_summary() + if avail_resources != self.last_avail_resources: + self.event_summarizer.add( + "Resized to {}.", # e.g., Resized to 100 CPUs, 4 GPUs. + quantity=avail_resources, + aggregate=lambda old, new: new) + self.last_avail_resources = avail_resources + def destroy_autoscaler_workers(self): """Cleanup the autoscaler, in case of an exception in the run() method. diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index abec214dc..f0f16318a 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -735,6 +735,12 @@ class AutoscalingTest(unittest.TestCase): # Eventually reaches steady state self.waitForNodes(5) + # Check the outdated node removal event is generated. + autoscaler.update() + events = autoscaler.event_summarizer.summary() + assert ("Removing 10 nodes of type " + "ray-legacy-worker-node-type (outdated)." in events), events + def testDynamicScaling(self): config_path = self.write_config(SMALL_CLUSTER) self.provider = MockProvider() @@ -781,6 +787,12 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes( 10, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + # Check the launch failure event is generated. + autoscaler.update() + events = autoscaler.event_summarizer.summary() + assert ("Removing 1 nodes of type " + "ray-legacy-worker-node-type (max workers)." in events), events + def testInitialWorkers(self): """initial_workers is deprecated, this tests that it is ignored.""" config = SMALL_CLUSTER.copy() @@ -1287,6 +1299,13 @@ class AutoscalingTest(unittest.TestCase): # The failed nodes might have been already terminated by autoscaler assert len(self.provider.non_terminated_nodes({})) < 2 + # Check the launch failure event is generated. + autoscaler.update() + events = autoscaler.event_summarizer.summary() + assert ( + "Removing 2 nodes of type " + "ray-legacy-worker-node-type (launch failed)." in events), events + def testConfiguresOutdatedNodes(self): from ray.autoscaler._private.cli_logger import cli_logger @@ -1408,6 +1427,13 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitForNodes(1, tag_filters={TAG_RAY_NODE_KIND: NODE_KIND_WORKER}) + # Check add/remove events. + events = autoscaler.event_summarizer.summary() + assert ("Adding 5 nodes of type " + "ray-legacy-worker-node-type." in events), events + assert ("Removing 4 nodes of type " + "ray-legacy-worker-node-type (idle)." in events), events + def testTargetUtilizationFraction(self): config = SMALL_CLUSTER.copy() config["min_workers"] = 0 @@ -1502,6 +1528,13 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() self.waitFor(lambda: len(runner.calls) > num_calls, num_retries=150) + # Check the node removal event is generated. + autoscaler.update() + events = autoscaler.event_summarizer.summary() + assert ("Restarting 1 nodes of type " + "ray-legacy-worker-node-type (lost contact with raylet)." in + events), events + def testExternalNodeScaler(self): config = SMALL_CLUSTER.copy() config["provider"] = { diff --git a/python/ray/tests/test_autoscaling_policy.py b/python/ray/tests/test_autoscaling_policy.py index e321ec73d..859109f82 100644 --- a/python/ray/tests/test_autoscaling_policy.py +++ b/python/ray/tests/test_autoscaling_policy.py @@ -491,6 +491,12 @@ class AutoscalingPolicyTest(unittest.TestCase): # TODO (Alex): Not clear what's actually worth asserting here. assert simulator.node_costs() + # Check event logs contain add/remove node events. + assert any("Adding" in x + for x in simulator.autoscaler.event_summarizer.summary()) + assert any("Removing" in x + for x in simulator.autoscaler.event_summarizer.summary()) + def testManyActors(self): config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG) config_path = self.write_config(config) @@ -518,6 +524,12 @@ class AutoscalingPolicyTest(unittest.TestCase): assert time < 650 + # Check event logs contain add/remove node events. + assert any("Adding" in x + for x in simulator.autoscaler.event_summarizer.summary()) + assert any("Removing" in x + for x in simulator.autoscaler.event_summarizer.summary()) + def testManyPlacementGroups(self): config = copy.deepcopy(SAMPLE_CLUSTER_CONFIG) config_path = self.write_config(config) @@ -584,6 +596,12 @@ class AutoscalingPolicyTest(unittest.TestCase): assert time < 630 + # Check event logs contain add/remove node events. + assert any("Adding" in x + for x in simulator.autoscaler.event_summarizer.summary()) + assert any("Removing" in x + for x in simulator.autoscaler.event_summarizer.summary()) + if __name__ == "__main__": import sys diff --git a/python/ray/tests/test_multi_node_2.py b/python/ray/tests/test_multi_node_2.py index 9e40b740c..b3e739e64 100644 --- a/python/ray/tests/test_multi_node_2.py +++ b/python/ray/tests/test_multi_node_2.py @@ -75,9 +75,13 @@ def setup_monitor(address): def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): request_resources(num_cpus=42) + # Disable event clearing for test. + monitor.event_summarizer.clear = lambda *a: None + while True: monitor.update_load_metrics() monitor.update_resource_requests() + monitor.update_event_summary() resource_usage = monitor.load_metrics._get_resource_usage() # Check resource request propagation. @@ -113,6 +117,9 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30): raise ValueError("Timeout. {} != {}".format( resource_usage, expected_resource_usage)) + # Sanity check we emitted a resize event. + assert any("Resized to" in x for x in monitor.event_summarizer.summary()) + return resource_usage diff --git a/python/ray/worker.py b/python/ray/worker.py index a5a28024a..350bbc649 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -14,8 +14,10 @@ import sys import threading import time import traceback +from typing import Any, Dict, List, Iterator # Ray modules +from ray.autoscaler._private.constants import AUTOSCALER_EVENTS import ray.cloudpickle as pickle import ray.gcs_utils import ray.memory_monitor as memory_monitor @@ -925,25 +927,93 @@ def print_to_stdstream(data): print_worker_logs(data, print_file) -def print_worker_logs(data, print_file): - def color_for(data): +# Start time of this process, used for relative time logs. +t0 = time.time() +autoscaler_log_fyi_printed = False + + +def filter_autoscaler_events(lines: List[str]) -> Iterator[str]: + """Given raw log lines from the monitor, return only autoscaler events. + + Autoscaler events are denoted by the ":event_summary:" magic token. + """ + global autoscaler_log_fyi_printed + + if not AUTOSCALER_EVENTS: + return + + # Print out autoscaler events only, ignoring other messages. + for line in lines: + if ":event_summary:" in line: + if not autoscaler_log_fyi_printed: + yield ("Tip: use `ray status` to view detailed " + "autoscaling status. To disable autoscaler event " + "messages, you can set AUTOSCALER_EVENTS=0.") + autoscaler_log_fyi_printed = True + # The event text immediately follows the ":event_summary:" + # magic token. + yield line.split(":event_summary:")[1] + + +def time_string() -> str: + """Return the relative time from the start of this job. + + For example, 15m30s. + """ + delta = time.time() - t0 + hours = 0 + minutes = 0 + while delta > 3600: + hours += 1 + delta -= 3600 + while delta > 60: + minutes += 1 + delta -= 60 + output = "" + if hours: + output += "{}h".format(hours) + if minutes: + output += "{}m".format(minutes) + output += "{}s".format(int(delta)) + return output + + +def print_worker_logs(data: Dict[str, str], print_file: Any): + def prefix_for(data: Dict[str, str]) -> str: + """The PID prefix for this log line.""" + if data["pid"] in ["autoscaler", "raylet"]: + return "" + else: + return "pid=" + + def color_for(data: Dict[str, str]) -> str: + """The color for this log line.""" if data["pid"] == "raylet": return colorama.Fore.YELLOW + elif data["pid"] == "autoscaler": + return colorama.Style.BRIGHT + colorama.Fore.CYAN else: return colorama.Fore.CYAN + if data["pid"] == "autoscaler": + pid = "{} +{}".format(data["pid"], time_string()) + lines = filter_autoscaler_events(data["lines"]) + else: + pid = data["pid"] + lines = data["lines"] + if data["ip"] == data["localhost"]: - for line in data["lines"]: + for line in lines: print( - "{}{}(pid={}){} {}".format(colorama.Style.DIM, color_for(data), - data["pid"], - colorama.Style.RESET_ALL, line), + "{}{}({}{}){} {}".format(colorama.Style.DIM, color_for(data), + prefix_for(data), pid, + colorama.Style.RESET_ALL, line), file=print_file) else: - for line in data["lines"]: + for line in lines: print( - "{}{}(pid={}, ip={}){} {}".format( - colorama.Style.DIM, color_for(data), data["pid"], + "{}{}({}{}, ip={}){} {}".format( + colorama.Style.DIM, color_for(data), prefix_for(data), pid, data["ip"], colorama.Style.RESET_ALL, line), file=print_file)