[Core] Clean up detached actors (#8759)

This commit is contained in:
SangBin Cho
2020-06-08 09:22:01 -07:00
committed by GitHub
parent 68718b33b4
commit 3388864768
6 changed files with 291 additions and 10 deletions
+138 -1
View File
@@ -668,7 +668,9 @@ def test_detached_actor(ray_start_regular):
ValueError, match="Actor name cannot be an empty string"):
DetachedActor._remote(name="")
DetachedActor._remote(name="d_actor")
d = DetachedActor._remote(name="d_actor")
assert ray.get(d.ping.remote()) == "pong"
with pytest.raises(ValueError, match="Please use a different name"):
DetachedActor._remote(name="d_actor")
@@ -697,6 +699,141 @@ ray.get(actor.ping.remote())
assert ray.get(detached_actor.ping.remote()) == "pong"
def test_detached_actor_cleanup(ray_start_regular):
@ray.remote
class DetachedActor:
def ping(self):
return "pong"
dup_actor_name = "actor"
def create_and_kill_actor(actor_name):
# Make sure same name is creatable after killing it.
detached_actor = DetachedActor.options(name=actor_name).remote()
# Wait for detached actor creation.
assert ray.get(detached_actor.ping.remote()) == "pong"
ray.kill(detached_actor)
# Wait until actor dies.
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != 3:
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
time.sleep(1.0)
wait_time += 1
if wait_time >= max_wait_time:
assert None, (
"It took too much time to kill an actor: {}".format(
detached_actor._actor_id))
create_and_kill_actor(dup_actor_name)
# This shouldn't be broken because actor
# name should have been cleaned up from GCS.
create_and_kill_actor(dup_actor_name)
redis_address = ray_start_regular["redis_address"]
driver_script = """
import ray
import time
ray.init(address="{}")
@ray.remote
class DetachedActor:
def ping(self):
return "pong"
# Make sure same name is creatable after killing it.
detached_actor = DetachedActor.options(name="{}").remote()
assert ray.get(detached_actor.ping.remote()) == "pong"
ray.kill(detached_actor)
# Wait until actor dies.
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != 3:
actor_status = ray.actors(actor_id=detached_actor._actor_id.hex())
time.sleep(1.0)
wait_time += 1
if wait_time >= max_wait_time:
assert None, (
"It took too much time to kill an actor")
""".format(redis_address, dup_actor_name)
run_string_as_driver(driver_script)
# Make sure we can create a detached actor created/killed
# at other scripts.
create_and_kill_actor(dup_actor_name)
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 3,
"num_nodes": 1,
"resources": {
"first_node": 5
}
}],
indirect=True)
def test_detached_actor_cleanup_due_to_failure(ray_start_cluster):
cluster = ray_start_cluster
node = cluster.add_node(resources={"second_node": 1})
cluster.wait_for_nodes()
@ray.remote
class DetachedActor:
def ping(self):
return "pong"
def kill_itself(self):
# kill itself.
os._exit(0)
worker_failure_actor_name = "worker_failure_actor_name"
node_failure_actor_name = "node_failure_actor_name"
def wait_until_actor_dead(handle):
actor_status = ray.actors(actor_id=handle._actor_id.hex())
max_wait_time = 10
wait_time = 0
while actor_status["State"] != 3:
actor_status = ray.actors(actor_id=handle._actor_id.hex())
time.sleep(1.0)
wait_time += 1
if wait_time >= max_wait_time:
assert None, (
"It took too much time to kill an actor: {}".format(
handle._actor_id))
def create_detached_actor_blocking(actor_name,
schedule_in_second_node=False):
resources = {"second_node": 1}\
if schedule_in_second_node\
else {"first_node": 1}
actor_handle = DetachedActor.options(
name=actor_name, resources=resources).remote()
# Wait for detached actor creation.
assert ray.get(actor_handle.ping.remote()) == "pong"
return actor_handle
# Name should be cleaned when workers fail
deatched_actor = create_detached_actor_blocking(worker_failure_actor_name)
deatched_actor.kill_itself.remote()
wait_until_actor_dead(deatched_actor)
# Name should be available now.
deatched_actor = create_detached_actor_blocking(worker_failure_actor_name)
assert ray.get(deatched_actor.ping.remote()) == "pong"
# Name should be cleaned when nodes fail.
deatched_actor = create_detached_actor_blocking(
node_failure_actor_name, schedule_in_second_node=True)
cluster.remove_node(node)
wait_until_actor_dead(deatched_actor)
# Name should be available now.
deatched_actor = create_detached_actor_blocking(node_failure_actor_name)
assert ray.get(deatched_actor.ping.remote()) == "pong"
def test_kill(ray_start_regular):
@ray.remote
class Actor:
+9 -1
View File
@@ -3,6 +3,7 @@ import logging
import ray
import ray.cloudpickle as pickle
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
from ray.gcs_utils import ActorTableData
logger = logging.getLogger(__name__)
@@ -30,7 +31,14 @@ def _get_actor(name):
raise ValueError(
"The actor with name={} doesn't exist".format(name))
handle = pickle.loads(pickled_state)
# If the actor state is dead, that means that this name is reusable.
# We don't delete the name entry from key value store when
# the actor is killed because ray.kill is asynchronous,
# and it can cause worker leaks.
actor_info = ray.actors(actor_id=handle._actor_id.hex())
actor_state = actor_info.get("State", None)
if actor_state and actor_state == ActorTableData.DEAD:
raise ValueError("The actor with name={} is dead.".format(name))
return handle