mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 14:06:42 +08:00
[Test] Cluster util fix (#8929)
This commit is contained in:
@@ -128,6 +128,16 @@ class Cluster:
|
||||
node (Node): Worker node of which all associated processes
|
||||
will be removed.
|
||||
"""
|
||||
global_node = ray.worker._global_node
|
||||
if global_node is not None:
|
||||
if node._raylet_socket_name == global_node._raylet_socket_name:
|
||||
ray.shutdown()
|
||||
raise ValueError(
|
||||
"Removing a node that is connected to this Ray client "
|
||||
"is not allowed because it will break the driver."
|
||||
"You can use the get_other_node utility to avoid removing"
|
||||
"a node that the Ray client is connected.")
|
||||
|
||||
if self.head_node == node:
|
||||
self.head_node.kill_all_processes(
|
||||
check_alive=False, allow_graceful=allow_graceful)
|
||||
|
||||
@@ -17,7 +17,6 @@ from ray.test_utils import (
|
||||
wait_for_errors,
|
||||
wait_for_pid_to_exit,
|
||||
generate_internal_config_map,
|
||||
get_non_head_nodes,
|
||||
get_other_nodes,
|
||||
)
|
||||
|
||||
@@ -288,11 +287,13 @@ def test_actor_restart_on_node_failure(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
# Head node with no resources.
|
||||
cluster.add_node(num_cpus=0, _internal_config=config)
|
||||
# Node to place the actor.
|
||||
cluster.add_node(num_cpus=1, _internal_config=config)
|
||||
cluster.wait_for_nodes()
|
||||
ray.init(address=cluster.address)
|
||||
|
||||
# Node to place the actor.
|
||||
actor_node = cluster.add_node(num_cpus=1, _internal_config=config)
|
||||
cluster.wait_for_nodes()
|
||||
|
||||
@ray.remote(num_cpus=1, max_restarts=1, max_task_retries=-1)
|
||||
class RestartableActor:
|
||||
"""An actor that will be reconstructed at most once."""
|
||||
@@ -311,7 +312,7 @@ def test_actor_restart_on_node_failure(ray_start_cluster):
|
||||
ray.get(actor.ready.remote())
|
||||
results = [actor.increase.remote() for _ in range(100)]
|
||||
# Kill actor node, while the above task is still being executed.
|
||||
cluster.remove_node(get_non_head_nodes(cluster)[-1])
|
||||
cluster.remove_node(actor_node)
|
||||
cluster.add_node(num_cpus=1, _internal_config=config)
|
||||
cluster.wait_for_nodes()
|
||||
# Check that none of the tasks failed and the actor is restarted.
|
||||
|
||||
Reference in New Issue
Block a user