Actor resource backlog hotfix (#12471)

* prepare implemented

* works?

* deflek

* git

* deflek round 2

* .

* improve the test

Co-authored-by: Alex <alex@anyscale.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
This commit is contained in:
Alex Wu
2020-11-29 20:55:50 -08:00
committed by GitHub
parent fb318addcb
commit f1cc33a6a6
3 changed files with 62 additions and 1 deletions
+54
View File
@@ -14,6 +14,7 @@ import ray.cluster_utils
from ray.test_utils import (run_string_as_driver, get_non_head_nodes,
wait_for_condition)
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
from ray._raylet import GlobalStateAccessor
def test_remote_functions_not_scheduled_on_actors(ray_start_regular):
@@ -1038,6 +1039,59 @@ def test_get_actor_no_input(ray_start_regular_shared):
ray.get_actor(bad_name)
def test_actor_resource_demand(shutdown_only):
ray.shutdown()
cluster = ray.init(num_cpus=3)
global_state_accessor = GlobalStateAccessor(
cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD)
global_state_accessor.connect()
@ray.remote(num_cpus=2)
class Actor:
def foo(self):
return "ok"
a = Actor.remote()
ray.get(a.foo.remote())
time.sleep(1)
message = global_state_accessor.get_all_heartbeat()
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
# The actor is scheduled so there should be no more demands left.
assert len(heartbeat.resource_load_by_shape.resource_demands) == 0
@ray.remote(num_cpus=80)
class Actor2:
pass
actors = []
actors.append(Actor2.remote())
time.sleep(1)
# This actor cannot be scheduled.
message = global_state_accessor.get_all_heartbeat()
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
assert len(heartbeat.resource_load_by_shape.resource_demands) == 1
assert (heartbeat.resource_load_by_shape.resource_demands[0].shape == {
"CPU": 80.0
})
assert (heartbeat.resource_load_by_shape.resource_demands[0]
.num_infeasible_requests_queued == 1)
actors.append(Actor2.remote())
time.sleep(1)
# Two actors cannot be scheduled.
message = global_state_accessor.get_all_heartbeat()
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message)
assert len(heartbeat.resource_load_by_shape.resource_demands) == 1
assert (heartbeat.resource_load_by_shape.resource_demands[0]
.num_infeasible_requests_queued == 2)
global_state_accessor.disconnect()
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))