[GCS] Fix actor task hang when its owner exits before local dependencies resolved (#8045)

This commit is contained in:
ZhuSenlin
2020-07-27 10:56:52 +08:00
committed by GitHub
parent f3fdb5c5db
commit a269ae9bc4
29 changed files with 871 additions and 236 deletions
+20
View File
@@ -8,6 +8,7 @@ try:
except ImportError:
pytest_timeout = None
import sys
import datetime
import ray
import ray.test_utils
@@ -816,5 +817,24 @@ def test_inherit_actor_from_class(ray_start_regular):
assert ray.get(actor.g.remote(5)) == 6
@pytest.mark.skip(
"This test is just used to print the latency of creating 100 actors.")
def test_actor_creation_latency(ray_start_regular):
# This test is just used to test the latency of actor creation.
@ray.remote
class Actor:
def get_value(self):
return 1
start = datetime.datetime.now()
actor_handles = [Actor.remote() for _ in range(100)]
actor_create_time = datetime.datetime.now()
for actor_handle in actor_handles:
ray.get(actor_handle.get_value.remote())
end = datetime.datetime.now()
print("actor_create_time_consume = {}, total_time_consume = {}".format(
actor_create_time - start, end - start))
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
+3 -3
View File
@@ -719,7 +719,7 @@ def test_detached_actor_cleanup(ray_start_regular):
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != 3:
while actor_status["State"] != ray.gcs_utils.ActorTableData.DEAD:
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
time.sleep(1.0)
wait_time += 1
@@ -753,7 +753,7 @@ ray.kill(detached_actor)
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != 3:
while actor_status["State"] != ray.gcs_utils.ActorTableData.DEAD:
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
time.sleep(1.0)
wait_time += 1
@@ -798,7 +798,7 @@ def test_detached_actor_cleanup_due_to_failure(ray_start_cluster):
actor_status = ray.actors(actor_id=handle._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != 3:
while actor_status["State"] != ray.gcs_utils.ActorTableData.DEAD:
actor_status = ray.actors(actor_id=handle._actor_id.hex())
time.sleep(1.0)
wait_time += 1
+137
View File
@@ -18,6 +18,7 @@ from ray.test_utils import (
wait_for_pid_to_exit,
generate_internal_config_map,
get_other_nodes,
SignalActor,
)
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
@@ -891,6 +892,142 @@ def test_ray_wait_dead_actor(ray_start_cluster):
assert wait_for_condition(lambda: ray.get(parent_actor.wait.remote()))
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
"num_nodes": 1,
}], indirect=True)
def test_actor_owner_worker_dies_before_dependency_ready(ray_start_cluster):
"""Test actor owner worker dies before local dependencies are resolved.
This test verifies the scenario where owner worker
has failed before actor dependencies are resolved.
Reference: https://github.com/ray-project/ray/pull/8045
"""
@ray.remote
class Actor:
def __init__(self, dependency):
print("actor: {}".format(os.getpid()))
self.dependency = dependency
def f(self):
return self.dependency
@ray.remote
class Owner:
def get_pid(self):
return os.getpid()
def create_actor(self, caller_handle):
s = SignalActor.remote()
# Create an actor which depends on an object that can never be
# resolved.
actor_handle = Actor.remote(s.wait.remote())
pid = os.getpid()
signal_handle = SignalActor.remote()
caller_handle.call.remote(pid, signal_handle, actor_handle)
# Wait until the `Caller` start executing the remote `call` method.
ray.get(signal_handle.wait.remote())
# exit
os._exit(0)
@ray.remote
class Caller:
def call(self, owner_pid, signal_handle, actor_handle):
# Notify the `Owner` that the `Caller` is executing the remote
# `call` method.
ray.get(signal_handle.send.remote())
# Wait for the `Owner` to exit.
wait_for_pid_to_exit(owner_pid)
oid = actor_handle.f.remote()
# It will hang without location resolution protocol.
ray.get(oid)
def hang(self):
return True
owner = Owner.remote()
owner_pid = ray.get(owner.get_pid.remote())
caller = Caller.remote()
owner.create_actor.remote(caller)
# Wait for the `Owner` to exit.
wait_for_pid_to_exit(owner_pid)
# It will hang here if location is not properly resolved.
assert (wait_for_condition(lambda: ray.get(caller.hang.remote())))
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 3,
"num_nodes": 1,
}], indirect=True)
def test_actor_owner_node_dies_before_dependency_ready(ray_start_cluster):
"""Test actor owner node dies before local dependencies are resolved.
This test verifies the scenario where owner node
has failed before actor dependencies are resolved.
Reference: https://github.com/ray-project/ray/pull/8045
"""
@ray.remote
class Actor:
def __init__(self, dependency):
print("actor: {}".format(os.getpid()))
self.dependency = dependency
def f(self):
return self.dependency
# Make sure it is scheduled in the second node.
@ray.remote(resources={"node": 1}, num_cpus=1)
class Owner:
def get_pid(self):
return os.getpid()
def create_actor(self, caller_handle):
s = SignalActor.remote()
# Create an actor which depends on an object that can never be
# resolved.
actor_handle = Actor.remote(s.wait.remote())
pid = os.getpid()
signal_handle = SignalActor.remote()
caller_handle.call.remote(pid, signal_handle, actor_handle)
# Wait until the `Caller` start executing the remote `call` method.
ray.get(signal_handle.wait.remote())
@ray.remote
class Caller:
def call(self, owner_pid, signal_handle, actor_handle):
# Notify the `Owner` that the `Caller` is executing the remote
# `call` method.
ray.get(signal_handle.send.remote())
# Wait for the `Owner` to exit.
wait_for_pid_to_exit(owner_pid)
oid = actor_handle.f.remote()
# It will hang without location resolution protocol.
ray.get(oid)
def hang(self):
return True
cluster = ray_start_cluster
node_to_be_broken = cluster.add_node(num_cpus=1, resources={"node": 1})
owner = Owner.remote()
owner_pid = ray.get(owner.get_pid.remote())
caller = Caller.remote()
owner.create_actor.remote(caller)
cluster.remove_node(node_to_be_broken)
# Wait for the `Owner` to exit.
wait_for_pid_to_exit(owner_pid)
# It will hang here if location is not properly resolved.
assert (wait_for_condition(lambda: ray.get(caller.hang.remote())))
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+4 -2
View File
@@ -123,9 +123,11 @@ def test_global_state_actor_entry(ray_start_regular):
a_actor_id = a._actor_id.hex()
b_actor_id = b._actor_id.hex()
assert ray.actors(actor_id=a_actor_id)["ActorID"] == a_actor_id
assert ray.actors(actor_id=a_actor_id)["State"] == 1
assert ray.actors(
actor_id=a_actor_id)["State"] == ray.gcs_utils.ActorTableData.ALIVE
assert ray.actors(actor_id=b_actor_id)["ActorID"] == b_actor_id
assert ray.actors(actor_id=b_actor_id)["State"] == 1
assert ray.actors(
actor_id=b_actor_id)["State"] == ray.gcs_utils.ActorTableData.ALIVE
if __name__ == "__main__":
+2 -1
View File
@@ -236,7 +236,8 @@ def test_raylet_info_endpoint(shutdown_only):
if child_actor_info["state"] == -1:
assert child_actor_info["requiredResources"]["CustomResource"] == 1
else:
assert child_actor_info["state"] == 1
assert child_actor_info[
"state"] == ray.gcs_utils.ActorTableData.ALIVE
assert len(child_actor_info["children"]) == 0
assert cpu_resources(child_actor_info) == 1