From 7af6c69672e0115739e14933fcb3e3431d53015a Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 29 Jun 2020 12:15:41 -0700 Subject: [PATCH] [Test] Cluster util fix (#8929) --- python/ray/cluster_utils.py | 10 ++++++++++ python/ray/tests/test_actor_failures.py | 9 +++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index 1892ae308..1b1b3bc24 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -128,6 +128,16 @@ class Cluster: node (Node): Worker node of which all associated processes will be removed. """ + global_node = ray.worker._global_node + if global_node is not None: + if node._raylet_socket_name == global_node._raylet_socket_name: + ray.shutdown() + raise ValueError( + "Removing a node that is connected to this Ray client " + "is not allowed because it will break the driver." + "You can use the get_other_node utility to avoid removing" + "a node that the Ray client is connected.") + if self.head_node == node: self.head_node.kill_all_processes( check_alive=False, allow_graceful=allow_graceful) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index d28cc7f18..34d2ad44d 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -17,7 +17,6 @@ from ray.test_utils import ( wait_for_errors, wait_for_pid_to_exit, generate_internal_config_map, - get_non_head_nodes, get_other_nodes, ) @@ -288,11 +287,13 @@ def test_actor_restart_on_node_failure(ray_start_cluster): cluster = ray_start_cluster # Head node with no resources. cluster.add_node(num_cpus=0, _internal_config=config) - # Node to place the actor. - cluster.add_node(num_cpus=1, _internal_config=config) cluster.wait_for_nodes() ray.init(address=cluster.address) + # Node to place the actor. + actor_node = cluster.add_node(num_cpus=1, _internal_config=config) + cluster.wait_for_nodes() + @ray.remote(num_cpus=1, max_restarts=1, max_task_retries=-1) class RestartableActor: """An actor that will be reconstructed at most once.""" @@ -311,7 +312,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster): ray.get(actor.ready.remote()) results = [actor.increase.remote() for _ in range(100)] # Kill actor node, while the above task is still being executed. - cluster.remove_node(get_non_head_nodes(cluster)[-1]) + cluster.remove_node(actor_node) cluster.add_node(num_cpus=1, _internal_config=config) cluster.wait_for_nodes() # Check that none of the tasks failed and the actor is restarted.