Revert "[Core]Fix ray.kill doesn't cancel pending actor bug (#13254)" (#14013)

This reverts commit 2092b097ea.
This commit is contained in:
Simon Mo
2021-02-09 11:36:38 -08:00
committed by GitHub
parent 1dcdfe9101
commit f51c26bae6
19 changed files with 54 additions and 325 deletions
-84
View File
@@ -1093,90 +1093,6 @@ def test_actor_resource_demand(shutdown_only):
global_state_accessor.disconnect()
def test_kill_pending_actor_with_no_restart_true():
cluster = ray.init()
global_state_accessor = GlobalStateAccessor(
cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD)
global_state_accessor.connect()
@ray.remote(resources={"WORKER": 1.0})
class PendingActor:
pass
# Kill actor with `no_restart=True`.
actor = PendingActor.remote()
# TODO(ffbin): The raylet doesn't guarantee the order when dealing with
# RequestWorkerLease and CancelWorkerLease. If we kill the actor
# immediately after creating the actor, we may not be able to clean up
# the request cached by the raylet.
# See https://github.com/ray-project/ray/issues/13545 for details.
time.sleep(1)
ray.kill(actor, no_restart=True)
def condition1():
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
if len(resource_usages.resource_load_by_shape.resource_demands) == 0:
return True
return False
# Actor is dead, so the infeasible task queue length is 0.
wait_for_condition(condition1, timeout=10)
global_state_accessor.disconnect()
ray.shutdown()
def test_kill_pending_actor_with_no_restart_false():
cluster = ray.init()
global_state_accessor = GlobalStateAccessor(
cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD)
global_state_accessor.connect()
@ray.remote(resources={"WORKER": 1.0}, max_restarts=1)
class PendingActor:
pass
# Kill actor with `no_restart=False`.
actor = PendingActor.remote()
# TODO(ffbin): The raylet doesn't guarantee the order when dealing with
# RequestWorkerLease and CancelWorkerLease. If we kill the actor
# immediately after creating the actor, we may not be able to clean up
# the request cached by the raylet.
# See https://github.com/ray-project/ray/issues/13545 for details.
time.sleep(1)
ray.kill(actor, no_restart=False)
def condition1():
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
if len(resource_usages.resource_load_by_shape.resource_demands) == 0:
return False
return True
# Actor restarts, so the infeasible task queue length is 1.
wait_for_condition(condition1, timeout=10)
# Kill actor again and actor is dead,
# so the infeasible task queue length is 0.
ray.kill(actor, no_restart=False)
def condition2():
message = global_state_accessor.get_all_resource_usage()
resource_usages = ray.gcs_utils.ResourceUsageBatchData.FromString(
message)
if len(resource_usages.resource_load_by_shape.resource_demands) == 0:
return True
return False
wait_for_condition(condition2, timeout=10)
global_state_accessor.disconnect()
ray.shutdown()
if __name__ == "__main__":
import pytest
# Test suite is timing out. Disable on windows for now.
+3 -9
View File
@@ -902,10 +902,8 @@ def test_capture_child_actors(ray_start_cluster):
# Kill an actor and wait until it is killed.
ray.kill(a)
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
except ray.exceptions.RayActorError:
pass
# Now create an actor, but do not capture the current tasks
a = Actor.options(
@@ -927,10 +925,8 @@ def test_capture_child_actors(ray_start_cluster):
# Kill an actor and wait until it is killed.
ray.kill(a)
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
except ray.exceptions.RayActorError:
pass
# Lastly, make sure when None is specified, actors are not scheduled
# on the same placement group.
@@ -1420,10 +1416,8 @@ ray.shutdown()
# Kill an actor and wait until it is killed.
ray.kill(a)
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.ready.remote())
except ray.exceptions.RayActorError:
pass
# We should have 2 alive pgs and 4 alive actors.
assert assert_alive_num_pg(2)
+2 -4
View File
@@ -199,19 +199,17 @@ def test_custom_resources(ray_start_regular_shared):
assert current_resources["CPU"] == 1.0
# By default an actor should not reserve any resources.
q = Queue()
Queue()
current_resources = ray.available_resources()
assert current_resources["CPU"] == 1.0
q.shutdown()
# Specify resource requirement. The queue should now reserve 1 CPU.
q = Queue(actor_options={"num_cpus": 1})
Queue(actor_options={"num_cpus": 1})
def no_cpu_in_resources():
return "CPU" not in ray.available_resources()
wait_for_condition(no_cpu_in_resources)
q.shutdown()
if __name__ == "__main__":
+1 -3
View File
@@ -470,10 +470,8 @@ def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put,
# Test that the actor exiting stops the reference from being pinned.
ray.kill(actor)
# Wait for the actor to exit.
try:
with pytest.raises(ray.exceptions.RayActorError):
ray.get(actor.delete_ref1.remote())
except ray.exceptions.RayActorError:
pass
else:
# Test that deleting the second reference stops it from being pinned.
ray.get(actor.delete_ref2.remote())