From eb025ea8cb27583e8ef6287f5654f23d1ab270ef Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 10 Sep 2020 01:03:52 -0700 Subject: [PATCH] [autoscaler] Create provider exactly once (#10703) Co-authored-by: Alex Wu --- python/ray/autoscaler/autoscaler.py | 14 +++++++++++--- python/ray/autoscaler/commands.py | 4 ++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 2767c82b7..42a166a47 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -64,6 +64,9 @@ class StandardAutoscaler: process_runner=subprocess, update_interval_s=AUTOSCALER_UPDATE_INTERVAL_S): self.config_path = config_path + # Keep this before self.reset (self.provider needs to be created + # exactly once). + self.provider = None self.reset(errors_fatal=True) self.load_metrics = load_metrics @@ -250,6 +253,7 @@ class StandardAutoscaler: self.should_update(node_id) for node_id in nodes): if node_id is not None: resources = self._node_resources(node_id) + logger.debug(f"{node_id}: Starting new thread runner.") T.append( threading.Thread( target=self.spawn_updater, @@ -295,9 +299,9 @@ class StandardAutoscaler: self.config = new_config self.runtime_hash = new_runtime_hash self.file_mounts_contents_hash = new_file_mounts_contents_hash - - self.provider = get_node_provider(self.config["provider"], - self.config["cluster_name"]) + if not self.provider: + self.provider = get_node_provider(self.config["provider"], + self.config["cluster_name"]) # Check whether we can enable the resource demand scheduler. if "available_node_types" in self.config: self.available_node_types = self.config["available_node_types"] @@ -462,6 +466,8 @@ class StandardAutoscaler: def spawn_updater(self, node_id, init_commands, ray_start_commands, node_resources, docker_config): + logger.info(f"Creating new (spawn_updater) updater thread for node" + f" {node_id}.") updater = NodeUpdaterThread( node_id=node_id, provider_config=self.config["provider"], @@ -492,6 +498,8 @@ class StandardAutoscaler: return False if self.num_failed_updates.get(node_id, 0) > 0: # TODO(ekl) retry? return False + logger.debug(f"{node_id} is not being updated and " + "passes config check (can_update=True).") return True def launch_new_node(self, count: int, node_type: Optional[str]) -> None: diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 6a9f92a1f..18327847b 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -455,7 +455,7 @@ def kill_node(config_file, yes, hard, override_cluster_name): def monitor_cluster(cluster_config_file, num_lines, override_cluster_name): """Tails the autoscaler logs of a Ray cluster.""" - cmd = "tail -n {} -f /tmp/ray/session_*/logs/monitor*".format(num_lines) + cmd = f"tail -n {num_lines} -f /tmp/ray/session_latest/logs/monitor*" exec_cluster( cluster_config_file, cmd=cmd, @@ -717,7 +717,7 @@ def get_or_create_head_node(config, logger, "get_or_create_head_node: " "Head node up-to-date, IP address is: {}", head_node_ip) - monitor_str = "tail -n 100 -f /tmp/ray/session_*/logs/monitor*" + monitor_str = "tail -n 100 -f /tmp/ray/session_latest/logs/monitor*" if override_cluster_name: modifiers = " --cluster-name={}".format( quote(override_cluster_name))