diff --git a/ci/jenkins_tests/run_multi_node_tests.sh b/ci/jenkins_tests/run_multi_node_tests.sh index 90bf860d2..e405f1bee 100755 --- a/ci/jenkins_tests/run_multi_node_tests.sh +++ b/ci/jenkins_tests/run_multi_node_tests.sh @@ -461,6 +461,7 @@ python3 $ROOT_DIR/multi_node_docker_test.py \ --shm-size=60G \ --test-script=/ray/test/jenkins_tests/multi_node_tests/large_memory_test.py + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ pytest /ray/python/ray/tune/test/cluster_tests.py @@ -528,3 +529,4 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ python /ray/python/ray/tune/examples/skopt_example.py \ --smoke-test + diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index dce68d1f7..a3d692314 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -155,6 +155,8 @@ class LoadMetrics(object): self.last_heartbeat_time_by_ip[ip] = now def mark_active(self, ip): + assert ip is not None, "IP should be known at this time" + logger.info("Node {} is newly setup, treating as active".format(ip)) self.last_heartbeat_time_by_ip[ip] = time.time() def prune_active_ips(self, active_ips): @@ -177,10 +179,14 @@ class LoadMetrics(object): prune(self.last_used_time_by_ip) prune(self.static_resources_by_ip) prune(self.dynamic_resources_by_ip) + prune(self.last_heartbeat_time_by_ip) def approx_workers_used(self): return self._info()["NumNodesUsed"] + def num_workers_connected(self): + return self._info()["NumNodesConnected"] + def info_string(self): return ", ".join( ["{}={}".format(k, v) for k, v in sorted(self._info().items())]) @@ -210,6 +216,13 @@ class LoadMetrics(object): heartbeat_times = [ now - t for t in self.last_heartbeat_time_by_ip.values() ] + most_delayed_heartbeats = sorted( + list(self.last_heartbeat_time_by_ip.items()), + key=lambda pair: pair[1])[:5] + most_delayed_heartbeats = { + ip: (now - t) + for ip, t in most_delayed_heartbeats + } return { "ResourceUsage": ", ".join([ "{}/{} {}".format( @@ -227,6 +240,7 @@ class LoadMetrics(object): int(np.min(heartbeat_times)) if heartbeat_times else -1, int(np.mean(heartbeat_times)) if heartbeat_times else -1, int(np.max(heartbeat_times)) if heartbeat_times else -1), + "MostDelayedHeartbeats": most_delayed_heartbeats, } @@ -239,7 +253,7 @@ class NodeLauncher(threading.Thread): def _launch_node(self, config, count): tag_filters = {TAG_RAY_NODE_TYPE: "worker"} - before = self.provider.nodes(tag_filters=tag_filters) + before = self.provider.non_terminated_nodes(tag_filters=tag_filters) launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"]) self.provider.create_node( config["worker_nodes"], { @@ -249,7 +263,7 @@ class NodeLauncher(threading.Thread): TAG_RAY_NODE_STATUS: "uninitialized", TAG_RAY_LAUNCH_CONFIG: launch_hash, }, count) - after = self.provider.nodes(tag_filters=tag_filters) + after = self.provider.non_terminated_nodes(tag_filters=tag_filters) if set(after).issubset(before): logger.error("NodeLauncher: " "No new nodes reported after node creation") @@ -430,7 +444,8 @@ class StandardAutoscaler(object): self.launch_new_node(num_launches) nodes = self.workers() self.log_info_string(nodes) - else: + elif self.load_metrics.num_workers_connected() >= target_workers: + logger.info("Ending bringup phase") self.bringup = False # Process any completed updates @@ -492,15 +507,15 @@ class StandardAutoscaler(object): "Error parsing config.") def target_num_workers(self): - initial_workers = self.config["initial_workers"] + target_frac = self.config["target_utilization_fraction"] + cur_used = self.load_metrics.approx_workers_used() + ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) + ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node if self.bringup: - ideal_num_workers = initial_workers - else: - target_frac = self.config["target_utilization_fraction"] - cur_used = self.load_metrics.approx_workers_used() - ideal_num_nodes = int(np.ceil(cur_used / float(target_frac))) - ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node + ideal_num_workers = max(ideal_num_workers, + self.config["initial_workers"]) + return min(self.config["max_workers"], max(self.config["min_workers"], ideal_num_workers)) @@ -603,7 +618,8 @@ class StandardAutoscaler(object): self.launch_queue.put((config, count)) def workers(self): - return self.provider.nodes(tag_filters={TAG_RAY_NODE_TYPE: "worker"}) + return self.provider.non_terminated_nodes( + tag_filters={TAG_RAY_NODE_TYPE: "worker"}) def log_info_string(self, nodes): logger.info("StandardAutoscaler: {}".format(self.info_string(nodes))) diff --git a/python/ray/autoscaler/aws/node_provider.py b/python/ray/autoscaler/aws/node_provider.py index 5bd3a74e5..8a05a4f7f 100644 --- a/python/ray/autoscaler/aws/node_provider.py +++ b/python/ray/autoscaler/aws/node_provider.py @@ -93,7 +93,7 @@ class AWSNodeProvider(NodeProvider): if self.tag_cache_kill_event.is_set(): return - def nodes(self, tag_filters): + def non_terminated_nodes(self, tag_filters): # Note that these filters are acceptable because they are set on # node initialization, and so can never be sitting in the cache. tag_filters = to_aws_format(tag_filters) @@ -230,12 +230,16 @@ class AWSNodeProvider(NodeProvider): def _get_node(self, node_id): """Refresh and get info for this node, updating the cache.""" - self.nodes({}) # Side effect: fetches and caches the node. + self.non_terminated_nodes({}) # Side effect: updates cache - assert node_id in self.cached_nodes, "Invalid instance id {}".format( - node_id) + if node_id in self.cached_nodes: + return self.cached_nodes[node_id] - return self.cached_nodes[node_id] + # Node not in {pending, running} -- retry with a point query. This + # usually means the node was recently preempted or terminated. + matches = list(self.ec2.instances.filter(InstanceIds=[node_id])) + assert len(matches) == 1, "Invalid instance id {}".format(node_id) + return matches[0] def _get_cached_node(self, node_id): """Return node info from cache if possible, otherwise fetches it.""" diff --git a/python/ray/autoscaler/commands.py b/python/ray/autoscaler/commands.py index 6032d814f..223460bc2 100644 --- a/python/ray/autoscaler/commands.py +++ b/python/ray/autoscaler/commands.py @@ -90,13 +90,13 @@ def teardown_cluster(config_file, yes, workers_only, override_cluster_name): A = [] else: A = [ - node_id for node_id in provider.nodes({ + node_id for node_id in provider.non_terminated_nodes({ TAG_RAY_NODE_TYPE: "head" }) ] A += [ - node_id for node_id in provider.nodes({ + node_id for node_id in provider.non_terminated_nodes({ TAG_RAY_NODE_TYPE: "worker" }) ] @@ -128,7 +128,7 @@ def kill_node(config_file, yes, override_cluster_name): provider = get_node_provider(config["provider"], config["cluster_name"]) try: - nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + nodes = provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) node = random.choice(nodes) logger.info("kill_node: Terminating worker {}".format(node)) @@ -165,7 +165,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, head_node_tags = { TAG_RAY_NODE_TYPE: "head", } - nodes = provider.nodes(head_node_tags) + nodes = provider.non_terminated_nodes(head_node_tags) if len(nodes) > 0: head_node = nodes[0] else: @@ -192,7 +192,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, config["cluster_name"]) provider.create_node(config["head_node"], head_node_tags, 1) - nodes = provider.nodes(head_node_tags) + nodes = provider.non_terminated_nodes(head_node_tags) assert len(nodes) == 1, "Failed to create head node." head_node = nodes[0] @@ -248,7 +248,7 @@ def get_or_create_head_node(config, config_file, no_restart, restart_only, yes, updater.join() # Refresh the node cache so we see the external ip if available - provider.nodes(head_node_tags) + provider.non_terminated_nodes(head_node_tags) if config.get("provider", {}).get("use_internal_ips", False) is True: head_node_ip = provider.internal_ip(head_node) @@ -484,7 +484,7 @@ def get_worker_node_ips(config_file, override_cluster_name): provider = get_node_provider(config["provider"], config["cluster_name"]) try: - nodes = provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + nodes = provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) if config.get("provider", {}).get("use_internal_ips", False) is True: return [provider.internal_ip(node) for node in nodes] @@ -503,7 +503,7 @@ def _get_head_node(config, head_node_tags = { TAG_RAY_NODE_TYPE: "head", } - nodes = provider.nodes(head_node_tags) + nodes = provider.non_terminated_nodes(head_node_tags) finally: provider.cleanup() diff --git a/python/ray/autoscaler/gcp/node_provider.py b/python/ray/autoscaler/gcp/node_provider.py index c79014cc6..7d6afa016 100644 --- a/python/ray/autoscaler/gcp/node_provider.py +++ b/python/ray/autoscaler/gcp/node_provider.py @@ -53,7 +53,7 @@ class GCPNodeProvider(NodeProvider): # excessive DescribeInstances requests. self.cached_nodes = {} - def nodes(self, tag_filters): + def non_terminated_nodes(self, tag_filters): with self.lock: if tag_filters: label_filter_expr = "(" + " AND ".join([ @@ -223,12 +223,19 @@ class GCPNodeProvider(NodeProvider): return result def _get_node(self, node_id): - self.nodes({}) # Side effect: fetches and caches the node. + self.non_terminated_nodes({}) # Side effect: updates cache - assert node_id in self.cached_nodes, "Invalid instance id {}".format( - node_id) + with self.lock: + if node_id in self.cached_nodes: + return self.cached_nodes[node_id] - return self.cached_nodes[node_id] + instance = self.compute.instances().get( + project=self.provider_config["project_id"], + zone=self.provider_config["availability_zone"], + instance=node_id, + ).execute() + + return instance def _get_cached_node(self, node_id): if node_id in self.cached_nodes: diff --git a/python/ray/autoscaler/local/node_provider.py b/python/ray/autoscaler/local/node_provider.py index 3e8d81939..cdf43db98 100644 --- a/python/ray/autoscaler/local/node_provider.py +++ b/python/ray/autoscaler/local/node_provider.py @@ -85,7 +85,7 @@ class LocalNodeProvider(NodeProvider): "/tmp/cluster-{}.state".format(cluster_name), provider_config) - def nodes(self, tag_filters): + def non_terminated_nodes(self, tag_filters): workers = self.state.get() matching_ips = [] for worker_ip, info in workers.items(): diff --git a/python/ray/autoscaler/node_provider.py b/python/ray/autoscaler/node_provider.py index 0b295ae6e..3e23208c9 100644 --- a/python/ray/autoscaler/node_provider.py +++ b/python/ray/autoscaler/node_provider.py @@ -132,7 +132,7 @@ class NodeProvider(object): self.provider_config = provider_config self.cluster_name = cluster_name - def nodes(self, tag_filters): + def non_terminated_nodes(self, tag_filters): """Return a list of node ids filtered by the specified tags dict. This list must not include terminated nodes. For performance reasons, @@ -141,7 +141,7 @@ class NodeProvider(object): nodes() must be called again to refresh results. Examples: - >>> provider.nodes({TAG_RAY_NODE_TYPE: "worker"}) + >>> provider.non_terminated_nodes({TAG_RAY_NODE_TYPE: "worker"}) ["node-1", "node-2"] """ raise NotImplementedError diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index a33290d5d..3760236a1 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -56,7 +56,7 @@ class MockProvider(NodeProvider): self.ready_to_create = threading.Event() self.ready_to_create.set() - def nodes(self, tag_filters): + def non_terminated_nodes(self, tag_filters): if self.throw: raise Exception("oops") return [ @@ -193,7 +193,7 @@ class AutoscalingTest(unittest.TestCase): def waitForNodes(self, expected, comparison=None, tag_filters={}): MAX_ITER = 50 for i in range(MAX_ITER): - n = len(self.provider.nodes(tag_filters)) + n = len(self.provider.non_terminated_nodes(tag_filters)) if comparison is None: comparison = self.assertEqual try: @@ -260,7 +260,7 @@ class AutoscalingTest(unittest.TestCase): self.provider = MockProvider() autoscaler = StandardAutoscaler( config_path, LoadMetrics(), max_failures=0, update_interval_s=0) - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(2) autoscaler.update() @@ -347,13 +347,13 @@ class AutoscalingTest(unittest.TestCase): max_concurrent_launches=5, max_failures=0, update_interval_s=0) - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 # Update will try to create, but will block until we set the flag self.provider.ready_to_create.clear() autoscaler.update() assert autoscaler.num_launches_pending.value == 2 - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 # Set the flag, check it updates self.provider.ready_to_create.set() @@ -365,7 +365,7 @@ class AutoscalingTest(unittest.TestCase): new_config["max_workers"] = 1 self.write_config(new_config) autoscaler.update() - assert len(self.provider.nodes({})) == 1 + assert len(self.provider.non_terminated_nodes({})) == 1 def testDelayedLaunchWithFailure(self): config = SMALL_CLUSTER.copy() @@ -380,7 +380,7 @@ class AutoscalingTest(unittest.TestCase): max_concurrent_launches=8, max_failures=0, update_interval_s=0) - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 # update() should launch a wave of 5 nodes (max_launch_batch) # Force this first wave to block. @@ -394,7 +394,7 @@ class AutoscalingTest(unittest.TestCase): waiters = rtc1._Event__cond._Condition__waiters self.waitFor(lambda: len(waiters) == 1) assert autoscaler.num_launches_pending.value == 5 - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 # Call update() to launch a second wave of 3 nodes, # as 5 + 3 = 8 = max_concurrent_launches. @@ -410,7 +410,7 @@ class AutoscalingTest(unittest.TestCase): self.provider.fail_creates = True rtc1.set() self.waitFor(lambda: autoscaler.num_launches_pending.value == 0) - assert len(self.provider.nodes({})) == 3 + assert len(self.provider.non_terminated_nodes({})) == 3 # Retry the first wave, allowing it to succeed this time self.provider.fail_creates = False @@ -443,7 +443,7 @@ class AutoscalingTest(unittest.TestCase): # not updated yet # note that node termination happens in the main thread, so # we do not need to add any delay here before checking - assert len(self.provider.nodes({})) == 2 + assert len(self.provider.non_terminated_nodes({})) == 2 assert autoscaler.num_launches_pending.value == 0 def testLaunchConfigChange(self): @@ -484,7 +484,7 @@ class AutoscalingTest(unittest.TestCase): autoscaler.update() time.sleep(0.1) assert autoscaler.num_launches_pending.value == 0 - assert len(self.provider.nodes({})) == 2 + assert len(self.provider.non_terminated_nodes({})) == 2 # New a good config again new_config = SMALL_CLUSTER.copy() @@ -515,7 +515,7 @@ class AutoscalingTest(unittest.TestCase): self.waitForNodes(2) for node in self.provider.mock_nodes.values(): node.state = "terminated" - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(2) @@ -591,12 +591,12 @@ class AutoscalingTest(unittest.TestCase): lm = LoadMetrics() autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, update_interval_s=0) - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() self.waitForNodes(1) autoscaler.update() assert autoscaler.num_launches_pending.value == 0 - assert len(self.provider.nodes({})) == 1 + assert len(self.provider.non_terminated_nodes({})) == 1 # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() @@ -613,19 +613,19 @@ class AutoscalingTest(unittest.TestCase): lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}) autoscaler.update() assert autoscaler.num_launches_pending.value == 0 - assert len(self.provider.nodes({})) == 5 + assert len(self.provider.non_terminated_nodes({})) == 5 # Scales down as nodes become unused lm.last_used_time_by_ip["172.0.0.0"] = 0 lm.last_used_time_by_ip["172.0.0.1"] = 0 autoscaler.update() assert autoscaler.num_launches_pending.value == 0 - assert len(self.provider.nodes({})) == 3 + assert len(self.provider.non_terminated_nodes({})) == 3 lm.last_used_time_by_ip["172.0.0.2"] = 0 lm.last_used_time_by_ip["172.0.0.3"] = 0 autoscaler.update() assert autoscaler.num_launches_pending.value == 0 - assert len(self.provider.nodes({})) == 1 + assert len(self.provider.non_terminated_nodes({})) == 1 def testDontScaleBelowTarget(self): config = SMALL_CLUSTER.copy() @@ -637,10 +637,10 @@ class AutoscalingTest(unittest.TestCase): lm = LoadMetrics() autoscaler = StandardAutoscaler( config_path, lm, max_failures=0, update_interval_s=0) - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 autoscaler.update() assert autoscaler.num_launches_pending.value == 0 - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() @@ -654,12 +654,12 @@ class AutoscalingTest(unittest.TestCase): lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}) lm.last_used_time_by_ip["172.0.0.0"] = 0 autoscaler.update() - assert len(self.provider.nodes({})) == 1 + assert len(self.provider.non_terminated_nodes({})) == 1 # Reduce load on head => target nodes = 1 => target workers = 0 lm.update(local_ip, {"CPU": 2}, {"CPU": 1}) autoscaler.update() - assert len(self.provider.nodes({})) == 0 + assert len(self.provider.non_terminated_nodes({})) == 0 @flaky(max_runs=4) def testRecoverUnhealthyWorkers(self):