diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index a812c6731..af490eb77 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -14,6 +14,7 @@ import ray.cluster_utils from ray.test_utils import (run_string_as_driver, get_non_head_nodes, wait_for_condition) from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put +from ray._raylet import GlobalStateAccessor def test_remote_functions_not_scheduled_on_actors(ray_start_regular): @@ -1038,6 +1039,59 @@ def test_get_actor_no_input(ray_start_regular_shared): ray.get_actor(bad_name) +def test_actor_resource_demand(shutdown_only): + ray.shutdown() + cluster = ray.init(num_cpus=3) + global_state_accessor = GlobalStateAccessor( + cluster["redis_address"], ray.ray_constants.REDIS_DEFAULT_PASSWORD) + global_state_accessor.connect() + + @ray.remote(num_cpus=2) + class Actor: + def foo(self): + return "ok" + + a = Actor.remote() + ray.get(a.foo.remote()) + time.sleep(1) + + message = global_state_accessor.get_all_heartbeat() + heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) + + # The actor is scheduled so there should be no more demands left. + assert len(heartbeat.resource_load_by_shape.resource_demands) == 0 + + @ray.remote(num_cpus=80) + class Actor2: + pass + + actors = [] + actors.append(Actor2.remote()) + time.sleep(1) + + # This actor cannot be scheduled. + message = global_state_accessor.get_all_heartbeat() + heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) + assert len(heartbeat.resource_load_by_shape.resource_demands) == 1 + assert (heartbeat.resource_load_by_shape.resource_demands[0].shape == { + "CPU": 80.0 + }) + assert (heartbeat.resource_load_by_shape.resource_demands[0] + .num_infeasible_requests_queued == 1) + + actors.append(Actor2.remote()) + time.sleep(1) + + # Two actors cannot be scheduled. + message = global_state_accessor.get_all_heartbeat() + heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(message) + assert len(heartbeat.resource_load_by_shape.resource_demands) == 1 + assert (heartbeat.resource_load_by_shape.resource_demands[0] + .num_infeasible_requests_queued == 2) + + global_state_accessor.disconnect() + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index a1fc20f88..8e2329c7b 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -35,6 +35,7 @@ GcsActorScheduler::GcsActorScheduler( gcs_pub_sub_(std::move(gcs_pub_sub)), schedule_failure_handler_(std::move(schedule_failure_handler)), schedule_success_handler_(std::move(schedule_success_handler)), + report_worker_backlog_(RayConfig::instance().report_worker_backlog()), lease_client_factory_(std::move(lease_client_factory)), core_worker_clients_(client_factory) { RAY_CHECK(schedule_failure_handler_ != nullptr && schedule_success_handler_ != nullptr); @@ -208,6 +209,9 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, remote_address.set_ip_address(node->node_manager_address()); remote_address.set_port(node->node_manager_port()); auto lease_client = GetOrConnectLeaseClient(remote_address); + // Actor leases should be sent to the raylet immediately, so we should never build up a + // backlog in GCS. + int backlog_size = report_worker_backlog_ ? 0 : -1; lease_client->RequestWorkerLease( actor->GetCreationTaskSpecification(), [this, node_id, actor, node](const Status &status, @@ -247,7 +251,8 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, RetryLeasingWorkerFromNode(actor, node); } } - }); + }, + backlog_size); } void GcsActorScheduler::RetryLeasingWorkerFromNode( diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index 99456adec..c1ebebda9 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -286,6 +286,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { std::function)> schedule_failure_handler_; /// The handler to handle the successful scheduling. std::function)> schedule_success_handler_; + /// Whether or not to report the backlog of actors waiting to be scheduled. + bool report_worker_backlog_; /// Factory for producing new clients to request leases from remote nodes. LeaseClientFactoryFn lease_client_factory_; /// The nodes which are releasing unused workers.