From 263df6163c108c4aed7c4609cc2895bea9964966 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 18 Aug 2020 12:44:00 -0700 Subject: [PATCH] [Placement Group] Placement group remove api part 1 (#10063) * Added basic rpc calls. * fix issues. * Fix the gcs server not getting request issue. * In Progress. * Basic logic done. Tests are required. * In progress. * In progress in refactoring context. * Revert "In progress in refactoring context." This reverts commit 38236256cf1306c60dd203e75d45ceb4509c8106. * Working now. * Python test works. * Lint. * Addressed code review. * Addressed code review. * Lint. * Added unit tests. * Done, but one of unit tests fail * Addressed code review. * Addressed the last code review. * Fix the wrong test case. --- python/ray/_raylet.pyx | 10 ++ python/ray/experimental/__init__.py | 6 +- python/ray/experimental/placement_group.py | 13 +- python/ray/includes/libcoreworker.pxd | 2 + python/ray/state.py | 6 +- python/ray/tests/test_placement_group.py | 76 ++++++++- src/ray/core_worker/core_worker.cc | 23 ++- src/ray/core_worker/core_worker.h | 13 +- src/ray/gcs/accessor.h | 12 +- .../gcs/gcs_client/service_based_accessor.cc | 18 ++ .../gcs/gcs_client/service_based_accessor.h | 3 + .../gcs_server/gcs_placement_group_manager.cc | 160 ++++++++++++++---- .../gcs_server/gcs_placement_group_manager.h | 45 ++++- .../gcs_placement_group_scheduler.cc | 158 ++++++++++++----- .../gcs_placement_group_scheduler.h | 50 +++++- .../test/gcs_placement_group_manager_test.cc | 151 +++++++++++++++-- .../gcs_placement_group_scheduler_test.cc | 76 ++++++++- .../gcs_server/test/gcs_server_test_util.h | 14 ++ src/ray/gcs/redis_accessor.cc | 5 + src/ray/gcs/redis_accessor.h | 3 + src/ray/protobuf/gcs.proto | 8 +- src/ray/protobuf/gcs_service.proto | 13 ++ src/ray/rpc/gcs_server/gcs_rpc_client.h | 3 + src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 + 24 files changed, 762 insertions(+), 111 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index ab7f83ade..2098df5c4 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1083,6 +1083,16 @@ cdef class CoreWorker: return PlacementGroupID(c_placement_group_id.Binary()) + def remove_placement_group(self, PlacementGroupID placement_group_id): + cdef: + CPlacementGroupID c_placement_group_id = \ + placement_group_id.native() + + with nogil: + check_status( + CCoreWorkerProcess.GetCoreWorker(). + RemovePlacementGroup(c_placement_group_id)) + def submit_actor_task(self, Language language, ActorID actor_id, diff --git a/python/ray/experimental/__init__.py b/python/ray/experimental/__init__.py index 3647380b3..4ef40a716 100644 --- a/python/ray/experimental/__init__.py +++ b/python/ray/experimental/__init__.py @@ -1,8 +1,10 @@ from .api import get, wait from .dynamic_resources import set_resource from .object_spilling import force_spill_objects, force_restore_spilled_objects -from .placement_group import (placement_group, placement_group_table) +from .placement_group import (placement_group, placement_group_table, + remove_placement_group) __all__ = [ "get", "wait", "set_resource", "force_spill_objects", - "force_restore_spilled_objects", "placement_group", "placement_group_table" + "force_restore_spilled_objects", "placement_group", + "placement_group_table", "remove_placement_group" ] diff --git a/python/ray/experimental/placement_group.py b/python/ray/experimental/placement_group.py index c4cecb458..980f2147b 100644 --- a/python/ray/experimental/placement_group.py +++ b/python/ray/experimental/placement_group.py @@ -1,6 +1,9 @@ -import ray from typing import (List, Dict) +import ray +from ray._raylet import ( + PlacementGroupID, ) + def placement_group(bundles: List[Dict[str, float]], strategy: str = "PACK", @@ -32,6 +35,14 @@ def placement_group(bundles: List[Dict[str, float]], return placement_group_id +def remove_placement_group(placement_group_id: PlacementGroupID): + assert type(placement_group_id) == PlacementGroupID + worker = ray.worker.global_worker + worker.check_connected() + + worker.core_worker.remove_placement_group(placement_group_id) + + def placement_group_table(placement_group_id): assert placement_group_id is not None worker = ray.worker.global_worker diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 871e5d0c3..a33de9ca8 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -98,6 +98,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus CreatePlacementGroup( const CPlacementGroupCreationOptions &options, CPlacementGroupID *placement_group_id) + CRayStatus RemovePlacementGroup( + const CPlacementGroupID &placement_group_id) void SubmitActorTask( const CActorID &actor_id, const CRayFunction &function, const c_vector[unique_ptr[CTaskArg]] &args, diff --git a/python/ray/state.py b/python/ray/state.py index 8c1e7b821..1d6c44104 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -404,10 +404,10 @@ class GlobalState: def get_state(state): if state == ray.gcs_utils.PlacementGroupTableData.PENDING: return "PENDING" - elif state == ray.gcs_utils.PlacementGroupTableData.ALIVE: - return "ALIVE" + elif state == ray.gcs_utils.PlacementGroupTableData.CREATED: + return "CREATED" else: - return "DEAD" + return "REMOVED" def get_strategy(strategy): if strategy == PlacementStrategy.PACK: diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 50234da94..6fea9e273 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -8,8 +8,9 @@ except ImportError: pytest_timeout = None import ray -import ray.test_utils +from ray.test_utils import wait_for_condition import ray.cluster_utils +from ray._raylet import PlacementGroupID def test_placement_group_pack(ray_start_cluster): @@ -219,6 +220,77 @@ def test_placement_group_hang(ray_start_cluster): assert "CPU_group_" in list(resources.keys())[0], resources +def test_remove_placement_group(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + # First try to remove a placement group that doesn't + # exist. This should not do anything. + random_placement_group_id = PlacementGroupID.from_random() + for _ in range(3): + ray.experimental.remove_placement_group(random_placement_group_id) + + # Creating a placement group as soon as it is + # created should work. + pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}]) + ray.experimental.remove_placement_group(pid) + + def is_placement_group_removed(): + table = ray.experimental.placement_group_table(pid) + if "state" not in table: + return False + return table["state"] == "REMOVED" + + wait_for_condition(is_placement_group_removed) + + # # Now let's create a placement group. + pid = ray.experimental.placement_group([{"CPU": 2}, {"CPU": 2}]) + + # # This is a hack to wait for placement group creation. + # # TODO(sang): Remove it when wait is implemented. + @ray.remote(num_cpus=0) + class A: + def f(self): + return 3 + + a = A.options(placement_group_id=pid).remote() + assert ray.get(a.f.remote()) == 3 + ray.experimental.remove_placement_group(pid) + # # Subsequent remove request shouldn't do anything + for _ in range(3): + ray.experimental.remove_placement_group(pid) + + # # Make sure placement group resources are + # # released and we can schedule this task. + @ray.remote(num_cpus=4) + def f(): + return 3 + + assert ray.get(f.remote()) == 3 + + # Since the placement group is removed, + # the actor should've been killed. + # That means this request should fail. + # TODO(sang): Turn it on. + # ray.get(a.f.remote()) + + +def test_remove_pending_placement_group(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=4) + ray.init(address=cluster.address) + # Create a placement group that cannot be scheduled now. + pid = ray.experimental.placement_group([{"GPU": 2}, {"CPU": 2}]) + ray.experimental.remove_placement_group(pid) + # TODO(sang): Add state check here. + @ray.remote(num_cpus=4) + def f(): + return 3 + + # Make sure this task is still schedulable. + assert ray.get(f.remote()) == 3 + + def test_placement_group_table(ray_start_cluster): @ray.remote(num_cpus=2) class Actor(object): @@ -257,7 +329,7 @@ def test_placement_group_table(ray_start_cluster): ray.get(actor_1.value.remote()) result = ray.experimental.placement_group_table(placement_group_id) - assert result["state"] == "ALIVE" + assert result["state"] == "CREATED" def test_cuda_visible_devices(ray_start_cluster): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 60dfbf94e..2f8dd59a5 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1309,7 +1309,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, Status CoreWorker::CreatePlacementGroup( const PlacementGroupCreationOptions &placement_group_creation_options, PlacementGroupID *return_placement_group_id) { - const PlacementGroupID placement_group_id = PlacementGroupID ::FromRandom(); + const PlacementGroupID placement_group_id = PlacementGroupID::FromRandom(); PlacementGroupSpecBuilder builder; builder.SetPlacementGroupSpec(placement_group_id, placement_group_creation_options.name, placement_group_creation_options.bundles, @@ -1322,6 +1322,27 @@ Status CoreWorker::CreatePlacementGroup( return Status::OK(); } +Status CoreWorker::RemovePlacementGroup(const PlacementGroupID &placement_group_id) { + std::shared_ptr> status_promise = + std::make_shared>(); + // Synchronously wait for placement group removal. + RAY_UNUSED(gcs_client_->PlacementGroups().AsyncRemovePlacementGroup( + placement_group_id, + [status_promise](Status status) { status_promise->set_value(status); })); + auto status_future = status_promise->get_future(); + if (status_future.wait_for(std::chrono::seconds( + RayConfig::instance().gcs_server_request_timeout_seconds())) != + std::future_status::ready) { + std::ostringstream stream; + stream << "There was timeout in removing the placement group of id " + << placement_group_id + << ". It is probably " + "because GCS server is dead or there's a high load there."; + return Status::TimedOut(stream.str()); + } + return status_future.get(); +} + void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function, const std::vector> &args, const TaskOptions &task_options, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 31211d867..4fbe30068 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -627,13 +627,22 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] function The remote function that generates the placement group object. /// \param[in] placement_group_creation_options Options for this placement group - /// creation task. \param[out] placement_group_id ID of the created placement group. - /// This can be used to shedule actor in node \return Status error if placement group + /// creation task. + /// \param[out] placement_group_id ID of the created placement group. + /// This can be used to shedule actor in node + /// \return Status error if placement group /// creation fails, likely due to raylet failure. Status CreatePlacementGroup( const PlacementGroupCreationOptions &placement_group_creation_options, PlacementGroupID *placement_group_id); + /// Remove a placement group. Note that this operation is synchronous. + /// + /// \param[in] placement_group_id The id of a placement group to remove. + /// \return Status OK if succeed. TimedOut if request to GCS server times out. + /// NotFound if placement group is already removed or doesn't exist. + Status RemovePlacementGroup(const PlacementGroupID &placement_group_id); + /// Submit an actor task. /// /// \param[in] caller_id ID of the task submitter. diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index b401b1b7f..2624f9e68 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -732,8 +732,7 @@ class PlacementGroupInfoAccessor { public: virtual ~PlacementGroupInfoAccessor() = default; - // TODO(AlisaWu): fill the accessor. - /// Create an placement group to GCS asynchronously. + /// Create a placement group to GCS asynchronously. /// /// \param placement_group_spec The specification for the placement group creation task. /// \param callback Callback that will be called after the placement group info is @@ -750,6 +749,15 @@ class PlacementGroupInfoAccessor { const PlacementGroupID &placement_group_id, const OptionalItemCallback &callback) = 0; + /// Remove a placement group to GCS synchronously. + /// + /// \param placement_group_id The id for the placement group to remove. + /// \param callback Callback that will be called after the placement group is + /// removed from GCS. + /// \return Status + virtual Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id, + const StatusCallback &callback) = 0; + protected: PlacementGroupInfoAccessor() = default; }; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 914a6724f..3682a367c 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -1457,6 +1457,24 @@ Status ServiceBasedPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( if (status.ok()) { RAY_LOG(DEBUG) << "Finished registering placement group. placement group id = " << placement_group_spec.PlacementGroupId(); + } else { + RAY_LOG(ERROR) << "Placement group id = " + << placement_group_spec.PlacementGroupId() + << " failed to be registered. " << status; + } + }); + return Status::OK(); +} + +Status ServiceBasedPlacementGroupInfoAccessor::AsyncRemovePlacementGroup( + const ray::PlacementGroupID &placement_group_id, const StatusCallback &callback) { + rpc::RemovePlacementGroupRequest request; + request.set_placement_group_id(placement_group_id.Binary()); + client_impl_->GetGcsRpcClient().RemovePlacementGroup( + request, + [callback](const Status &status, const rpc::RemovePlacementGroupReply &reply) { + if (callback) { + callback(status); } }); return Status::OK(); diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 4b99de3e7..6f3289c93 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -446,6 +446,9 @@ class ServiceBasedPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor Status AsyncCreatePlacementGroup( const PlacementGroupSpecification &placement_group_spec) override; + Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id, + const StatusCallback &callback) override; + Status AsyncGet( const PlacementGroupID &placement_group_id, const OptionalItemCallback &callback) override; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index e56e66e42..75ae6b146 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -56,6 +56,13 @@ const rpc::PlacementGroupTableData &GcsPlacementGroup::GetPlacementGroupTableDat return placement_group_table_data_; } +const std::string GcsPlacementGroup::DebugString() const { + std::stringstream stream; + stream << "placement group id = " << GetPlacementGroupID() << ", name = " << GetName() + << ", strategy = " << GetStrategy(); + return stream.str(); +} + ///////////////////////////////////////////////////////////////////////////////////////// GcsPlacementGroupManager::GcsPlacementGroupManager( @@ -67,7 +74,7 @@ GcsPlacementGroupManager::GcsPlacementGroupManager( gcs_table_storage_(std::move(gcs_table_storage)) {} void GcsPlacementGroupManager::RegisterPlacementGroup( - const ray::rpc::CreatePlacementGroupRequest &request, EmptyCallback callback) { + const ray::rpc::CreatePlacementGroupRequest &request, StatusCallback callback) { RAY_CHECK(callback); const auto &placement_group_spec = request.placement_group_spec(); auto placement_group_id = @@ -80,8 +87,7 @@ void GcsPlacementGroupManager::RegisterPlacementGroup( // Mark the callback as pending and invoke it after the placement_group has been // successfully created. placement_group_to_register_callback_[placement_group_id] = std::move(callback); - registered_placement_groups_.emplace(placement_group->GetPlacementGroupID(), - placement_group); + registered_placement_groups_.emplace(placement_group_id, placement_group); pending_placement_groups_.emplace_back(std::move(placement_group)); SchedulePendingPlacementGroups(); } @@ -102,17 +108,17 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( std::shared_ptr placement_group) { RAY_LOG(WARNING) << "Failed to create placement group " << placement_group->GetName() << ", try again."; - // We will attempt to schedule this placement_group once an eligible node is - // registered. + // We will attempt to schedule this placement_group once + // an eligible node is registered. pending_placement_groups_.emplace_back(std::move(placement_group)); - is_creating_ = false; + MarkSchedulingDone(); RetryCreatingPlacementGroup(); } void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( const std::shared_ptr &placement_group) { RAY_LOG(INFO) << "Successfully created placement group " << placement_group->GetName(); - placement_group->UpdateState(rpc::PlacementGroupTableData::ALIVE); + placement_group->UpdateState(rpc::PlacementGroupTableData::CREATED); auto placement_group_id = placement_group->GetPlacementGroupID(); RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( placement_group_id, placement_group->GetPlacementGroupTableData(), @@ -123,27 +129,33 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess( // and remove it from placement_group_to_register_callback_. auto iter = placement_group_to_register_callback_.find(placement_group_id); if (iter != placement_group_to_register_callback_.end()) { - iter->second(); + iter->second(Status::OK()); placement_group_to_register_callback_.erase(iter); } - is_creating_ = false; + MarkSchedulingDone(); SchedulePendingPlacementGroups(); })); } void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { - if (pending_placement_groups_.empty() || is_creating_) { + if (pending_placement_groups_.empty() || IsSchedulingInProgress()) { return; } - is_creating_ = true; - gcs_placement_group_scheduler_->Schedule( - pending_placement_groups_.front(), - [this](std::shared_ptr placement_group) { - OnPlacementGroupCreationFailed(std::move(placement_group)); - }, - [this](std::shared_ptr placement_group) { - OnPlacementGroupCreationSuccess(std::move(placement_group)); - }); + const auto placement_group = pending_placement_groups_.front(); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + // Do not reschedule if the placement group has removed already. + if (registered_placement_groups_.find(placement_group_id) != + registered_placement_groups_.end()) { + MarkSchedulingStarted(placement_group_id); + gcs_placement_group_scheduler_->Schedule( + placement_group, + [this](std::shared_ptr placement_group) { + OnPlacementGroupCreationFailed(std::move(placement_group)); + }, + [this](std::shared_ptr placement_group) { + OnPlacementGroupCreationSuccess(std::move(placement_group)); + }); + } pending_placement_groups_.pop_front(); } @@ -153,24 +165,106 @@ void GcsPlacementGroupManager::HandleCreatePlacementGroup( ray::rpc::SendReplyCallback send_reply_callback) { auto placement_group_id = PlacementGroupID::FromBinary(request.placement_group_spec().placement_group_id()); - const auto &name = request.placement_group_spec().name(); - const auto &strategy = request.placement_group_spec().strategy(); - RAY_LOG(INFO) << "Registering placement group, placement group id = " - << placement_group_id << ", name = " << name - << ", strategy = " << PlacementStrategy_Name(strategy); auto placement_group = std::make_shared(request); + + RAY_LOG(INFO) << "Registering placement group, " << placement_group->DebugString(); + // We need this call here because otherwise, if placement group is removed right after + // here, it can cause inconsistent states. + registered_placement_groups_.emplace(placement_group_id, placement_group); + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( placement_group_id, placement_group->GetPlacementGroupTableData(), - [this, request, reply, send_reply_callback, placement_group_id, name, - strategy](Status status) { + [this, request, reply, send_reply_callback, placement_group_id, + placement_group](Status status) { RAY_CHECK_OK(status); - RegisterPlacementGroup(request, [reply, send_reply_callback, placement_group_id, - name, strategy]() { - RAY_LOG(INFO) << "Finished registering placement group, placement group id = " - << placement_group_id << ", name = " << name - << ", strategy = " << strategy; - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - }); + if (registered_placement_groups_.find(placement_group_id) == + registered_placement_groups_.end()) { + std::stringstream stream; + stream << "Placement group of id " << placement_group_id + << " has been removed before registration."; + GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::NotFound(stream.str())); + return; + } + + RegisterPlacementGroup( + request, [reply, send_reply_callback, placement_group](Status status) { + if (status.ok()) { + RAY_LOG(INFO) << "Finished registering placement group, " + << placement_group->DebugString(); + } else { + RAY_LOG(WARNING) + << "Failed to register placement group, " + << placement_group->DebugString() << ", cause: " << status.message(); + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }); + })); +} + +void GcsPlacementGroupManager::HandleRemovePlacementGroup( + const rpc::RemovePlacementGroupRequest &request, + rpc::RemovePlacementGroupReply *reply, rpc::SendReplyCallback send_reply_callback) { + const auto placement_group_id = + PlacementGroupID::FromBinary(request.placement_group_id()); + + RemovePlacementGroup(placement_group_id, [send_reply_callback, reply, + placement_group_id](Status status) { + if (status.ok()) { + RAY_LOG(INFO) << "Placement group of an id, " << placement_group_id + << " is removed successfully."; + } + GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); + }); +} + +void GcsPlacementGroupManager::RemovePlacementGroup( + const PlacementGroupID &placement_group_id, + StatusCallback on_placement_group_removed) { + RAY_CHECK(on_placement_group_removed); + // If the placement group has been already removed, don't do anything. + auto placement_group_it = registered_placement_groups_.find(placement_group_id); + if (placement_group_it == registered_placement_groups_.end()) { + on_placement_group_removed(Status::OK()); + return; + } + auto placement_group = placement_group_it->second; + registered_placement_groups_.erase(placement_group_it); + + // Destroy all bundles. + gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists( + placement_group_id); + // Cancel the scheduling request if necessary. + if (IsSchedulingInProgress(placement_group_id)) { + // If the placement group is scheduling. + gcs_placement_group_scheduler_->MarkScheduleCancelled(placement_group_id); + } + + // Remove a placement group from a pending list if exists. + auto pending_it = std::find_if( + pending_placement_groups_.begin(), pending_placement_groups_.end(), + [placement_group_id](const std::shared_ptr &placement_group) { + return placement_group->GetPlacementGroupID() == placement_group_id; + }); + if (pending_it != pending_placement_groups_.end()) { + // The placement group was pending scheduling, remove it from the queue. + pending_placement_groups_.erase(pending_it); + } + + // Flush the status and respond to workers. + placement_group->UpdateState(rpc::PlacementGroupTableData::REMOVED); + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( + placement_group->GetPlacementGroupID(), + placement_group->GetPlacementGroupTableData(), + [this, on_placement_group_removed, placement_group_id](Status status) { + RAY_CHECK_OK(status); + // If placement group hasn't been created yet, send a response to a core worker + // that the creation of placement group has failed. + auto it = placement_group_to_register_callback_.find(placement_group_id); + if (it != placement_group_to_register_callback_.end()) { + it->second(Status::NotFound("Placement group is removed before it is created")); + placement_group_to_register_callback_.erase(it); + } + on_placement_group_removed(status); })); } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 93e03f775..a13231ef9 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -73,6 +73,9 @@ class GcsPlacementGroup { /// Get the Strategy rpc::PlacementStrategy GetStrategy() const; + // Get debug string for the placement group. + const std::string DebugString() const; + private: /// The placement_group meta data which contains the task specification as well as the /// state of the gcs placement_group and so on (see gcs.proto). @@ -107,6 +110,10 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { rpc::CreatePlacementGroupReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void HandleRemovePlacementGroup(const rpc::RemovePlacementGroupRequest &request, + rpc::RemovePlacementGroupReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + void HandleGetPlacementGroup(const rpc::GetPlacementGroupRequest &request, rpc::GetPlacementGroupReply *reply, rpc::SendReplyCallback send_reply_callback) override; @@ -116,10 +123,10 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// \param request Contains the meta info to create the placement_group. /// \param callback Will be invoked after the placement_group is created successfully or /// be invoked immediately if the placement_group is already registered to - /// `registered_placement_groups_` and its state is `ALIVE`. The callback will not be + /// `registered_placement_groups_` and its state is `CREATED`. The callback will not be /// called in this case. void RegisterPlacementGroup(const rpc::CreatePlacementGroupRequest &request, - EmptyCallback callback); + StatusCallback callback); /// Schedule placement_groups in the `pending_placement_groups_` queue. /// This function is exposed for testing only. @@ -145,31 +152,59 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { void OnPlacementGroupCreationSuccess( const std::shared_ptr &placement_group); + /// TODO-SANG Fill it up. + void RemovePlacementGroup(const PlacementGroupID &placement_group_id, + StatusCallback on_placement_group_removed); + private: /// Try to create placement group after a short time. void RetryCreatingPlacementGroup(); + /// Mark the manager that there's a placement group scheduling going on. + void MarkSchedulingStarted(const PlacementGroupID placement_group_id) { + scheduling_in_progress_id_ = placement_group_id; + } + + /// Mark the manager that there's no more placement group scheduling going on. + void MarkSchedulingDone() { scheduling_in_progress_id_ = PlacementGroupID::Nil(); } + + /// Check if the placement group of a given id is scheduling. + bool IsSchedulingInProgress(const PlacementGroupID &placement_group_id) const { + return scheduling_in_progress_id_ == placement_group_id; + } + + /// Check if there's any placement group scheduling going on. + bool IsSchedulingInProgress() const { + return scheduling_in_progress_id_ != PlacementGroupID::Nil(); + } + /// The io loop that is used to delay execution of tasks (e.g., /// execute_after). boost::asio::io_context &io_context_; /// Callback of placement_group registration requests that are not yet flushed. - absl::flat_hash_map + absl::flat_hash_map placement_group_to_register_callback_; /// All registered placement_groups (pending placement_groups are also included). absl::flat_hash_map> registered_placement_groups_; + /// The pending placement_groups which will not be scheduled until there's a resource /// change. std::deque> pending_placement_groups_; + /// The scheduler to schedule all registered placement_groups. std::shared_ptr gcs_placement_group_scheduler_; + /// Used to update placement group information upon creation, deletion, etc. std::shared_ptr gcs_table_storage_; - /// If a placement group is creating - bool is_creating_ = false; + + /// The placement group id that is in progress of scheduling bundles. + /// TODO(sang): Currently, only one placement group can be scheduled at a time. + /// We should probably support concurrenet creation (or batching). + PlacementGroupID scheduling_in_progress_id_ = PlacementGroupID::Nil(); }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 8a6246792..dca55e0e8 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -134,57 +134,83 @@ void GcsPlacementGroupScheduler::Schedule( // If schedule success, the decision will be set as schedule_map[bundles[pos]] // else will be set ClientID::Nil(). - auto bundle_locations = std::make_shared>, pair_hash>>(); + auto bundle_locations = std::make_shared(); // To count how many scheduler have been return, which include success and failure. auto finished_count = std::make_shared(); + RAY_CHECK( + placement_group_leasing_in_progress_.emplace(placement_group->GetPlacementGroupID()) + .second); + /// TODO(AlisaWu): Change the strategy when reserve resource failed. for (auto &bundle : bundles) { const auto &bundle_id = bundle->BundleId(); const auto &node_id = selected_nodes[bundle_id]; RAY_CHECK(node_to_bundles_when_leasing_[node_id].emplace(bundle_id).second); + ReserveResourceFromNode( bundle, gcs_node_manager_.GetNode(node_id), [this, bundle_id, bundle, bundles, node_id, placement_group, bundle_locations, finished_count, failure_callback, success_callback](const Status &status) { + auto leasing_bundles = node_to_bundles_when_leasing_.find(node_id); + RAY_CHECK(leasing_bundles != node_to_bundles_when_leasing_.end()); + auto bundle_iter = leasing_bundles->second.find(bundle->BundleId()); + RAY_CHECK(bundle_iter != leasing_bundles->second.end()); + // Remove the bundle from the leasing map as the reply is returned from the + // remote node. + leasing_bundles->second.erase(bundle_iter); + if (leasing_bundles->second.empty()) { + node_to_bundles_when_leasing_.erase(leasing_bundles); + } + if (status.ok()) { (*bundle_locations)[bundle_id] = std::make_pair(node_id, bundle); } if (++(*finished_count) == bundles.size()) { - if (bundle_locations->size() == bundles.size()) { - rpc::ScheduleData data; - for (const auto &iter : bundles) { - // TODO(ekl) this is a hack to get a string key for the proto - auto key = - iter->PlacementGroupId().Hex() + "_" + std::to_string(iter->Index()); - data.mutable_schedule_plan()->insert( - {key, (*bundle_locations)[iter->BundleId()].first.Binary()}); - } - - // Update `node_to_leased_bundles_`. - for (const auto &iter : *bundle_locations) { - const auto &location = iter.second; - node_to_leased_bundles_[location.first].push_back(location.second); - } - - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put( - placement_group->GetPlacementGroupID(), data, - [success_callback, placement_group](Status status) { - success_callback(placement_group); - })); - } else { - for (const auto &iter : *bundle_locations) { - CancelResourceReserve(iter.second.second, - gcs_node_manager_.GetNode(node_id)); - } - failure_callback(placement_group); - } + OnAllBundleSchedulingRequestReturned(placement_group, bundles, + bundle_locations, failure_callback, + success_callback); } }); } } +void GcsPlacementGroupScheduler::DestroyPlacementGroupBundleResourcesIfExists( + const PlacementGroupID &placement_group_id) { + auto it = placement_group_to_bundle_location_.find(placement_group_id); + // If bundle location has been already removed, it means bundles + // are already destroyed. Do nothing. + if (it == placement_group_to_bundle_location_.end()) { + return; + } + + std::shared_ptr bundle_locations = it->second; + for (const auto &iter : *bundle_locations) { + auto &bundle_spec = iter.second.second; + auto &node_id = iter.second.first; + CancelResourceReserve(bundle_spec, gcs_node_manager_.GetNode(node_id)); + } + placement_group_to_bundle_location_.erase(it); + + // Remove bundles from node_to_leased_bundles_ because bundels are removed now. + for (const auto &bundle_location : *bundle_locations) { + const auto &bundle_id = bundle_location.first; + const auto &node_id = bundle_location.second.first; + const auto &leased_bundles_it = node_to_leased_bundles_.find(node_id); + // node could've been already dead at this point. + if (leased_bundles_it != node_to_leased_bundles_.end()) { + leased_bundles_it->second.erase(bundle_id); + } + } +} + +void GcsPlacementGroupScheduler::MarkScheduleCancelled( + const PlacementGroupID &placement_group_id) { + auto it = placement_group_leasing_in_progress_.find(placement_group_id); + RAY_CHECK(it != placement_group_leasing_in_progress_.end()); + placement_group_leasing_in_progress_.erase(it); +} + void GcsPlacementGroupScheduler::ReserveResourceFromNode( const std::shared_ptr &bundle, const std::shared_ptr &node, const StatusCallback &callback) { @@ -197,15 +223,11 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( RAY_LOG(INFO) << "Leasing resource from node " << node_id << " for bundle: " << bundle->DebugString(); lease_client->RequestResourceReserve( - *bundle, [this, node_id, bundle, callback]( + *bundle, [node_id, bundle, callback]( const Status &status, const rpc::RequestResourceReserveReply &reply) { // TODO(AlisaWu): Add placement group cancel. auto result = reply.success() ? Status::OK() : Status::IOError("Failed to reserve resource"); - auto bundles = node_to_bundles_when_leasing_.find(node_id); - RAY_CHECK(bundles != node_to_bundles_when_leasing_.end()); - auto bundle_iter = bundles->second.find(bundle->BundleId()); - RAY_CHECK(bundle_iter != bundles->second.end()); if (result.ok()) { RAY_LOG(INFO) << "Finished leasing resource from " << node_id << " for bundle: " << bundle->DebugString(); @@ -213,12 +235,6 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( RAY_LOG(WARNING) << "Failed to lease resource from " << node_id << " for bundle: " << bundle->DebugString(); } - // Remove the bundle from the leasing map as the reply is returned from the - // remote node. - bundles->second.erase(bundle_iter); - if (bundles->second.empty()) { - node_to_bundles_when_leasing_.erase(bundles); - } callback(result); }); } @@ -226,6 +242,13 @@ void GcsPlacementGroupScheduler::ReserveResourceFromNode( void GcsPlacementGroupScheduler::CancelResourceReserve( const std::shared_ptr &bundle_spec, const std::shared_ptr &node) { + if (node == nullptr) { + RAY_LOG(WARNING) << "Node id " << node->node_id() << " for a placement group id " + << bundle_spec->PlacementGroupId() << " and a bundle index, " + << bundle_spec->Index() + << " has already removed. Cancellation request will be ignored."; + return; + } auto node_id = ClientID::FromBinary(node->node_id()); RAY_LOG(INFO) << "Cancelling the resource reserved for bundle: " << bundle_spec->DebugString() << " at node " << node_id; @@ -253,13 +276,64 @@ GcsPlacementGroupScheduler::GetOrConnectLeaseClient(const rpc::Address &raylet_a return iter->second; } +void GcsPlacementGroupScheduler::OnAllBundleSchedulingRequestReturned( + const std::shared_ptr &placement_group, + const std::vector> &bundles, + const std::shared_ptr &bundle_locations, + const std::function)> + &schedule_failure_handler, + const std::function)> + &schedule_success_handler) { + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + RAY_CHECK( + placement_group_to_bundle_location_.emplace(placement_group_id, bundle_locations) + .second); + + if (placement_group_leasing_in_progress_.find(placement_group_id) == + placement_group_leasing_in_progress_.end() || + bundle_locations->size() != bundles.size()) { + // If the lease request has been already cancelled + // or not every lease request succeeds. + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id); + schedule_failure_handler(placement_group); + } else { + // If we successfully created placement group, store them to GCS. + rpc::ScheduleData data; + for (const auto &iter : bundles) { + // TODO(ekl) this is a hack to get a string key for the proto + auto key = iter->PlacementGroupId().Hex() + "_" + std::to_string(iter->Index()); + data.mutable_schedule_plan()->insert( + {key, (*bundle_locations)[iter->BundleId()].first.Binary()}); + } + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupScheduleTable().Put( + placement_group_id, data, + [schedule_success_handler, placement_group](Status status) { + schedule_success_handler(placement_group); + })); + // Update `node_to_leased_bundles_`. + for (const auto &iter : *bundle_locations) { + const auto &location = iter.second; + const auto &bundle_sepc = location.second; + node_to_leased_bundles_[location.first].emplace(bundle_sepc->BundleId(), + bundle_sepc); + } + } + // Erase leasing in progress placement group. + // This could've been removed if the leasing request is cancelled already. + auto it = placement_group_leasing_in_progress_.find(placement_group_id); + if (it != placement_group_leasing_in_progress_.end()) { + placement_group_leasing_in_progress_.erase(it); + } +} + std::unique_ptr GcsPlacementGroupScheduler::GetScheduleContext() { // TODO(ffbin): We will add listener to the GCS node manager to handle node deletion. auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); for (const auto &iter : alive_nodes) { if (!node_to_leased_bundles_.contains(iter.first)) { node_to_leased_bundles_.emplace( - iter.first, std::vector>()); + iter.first, + absl::flat_hash_map>()); } } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 7d44874fe..f33c64d40 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -38,6 +38,8 @@ struct pair_hash { } }; using ScheduleMap = std::unordered_map; +using BundleLocations = std::unordered_map< + BundleID, std::pair>, pair_hash>; class GcsPlacementGroup; class GcsPlacementGroupSchedulerInterface { @@ -51,6 +53,13 @@ class GcsPlacementGroupSchedulerInterface { std::function)> schedule_success_handler) = 0; + /// Destroy bundle resources from all nodes in the placement group. + virtual void DestroyPlacementGroupBundleResourcesIfExists( + const PlacementGroupID &placement_group_id) = 0; + + /// Mark the placement group schedule as cancelled. Cancelled bundles will be destroyed. + virtual void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) = 0; + virtual ~GcsPlacementGroupSchedulerInterface() {} }; @@ -128,6 +137,20 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { std::function)> failure_handler, std::function)> success_handler) override; + /// Destroy bundle resources from all nodes in the placement group. + /// This doesn't do anything if bundles are already destroyed. + /// + /// \param placement_group_id The id of a placement group to destroy all bundle + /// resources. + void DestroyPlacementGroupBundleResourcesIfExists( + const PlacementGroupID &placement_group_id) override; + + /// Mark the placement group schedule as cancelled. + /// Cancelled bundles will be destroyed. + /// \param placement_group_id The id of a placement group to mark that scheduling is + /// cancelled. + void MarkScheduleCancelled(const PlacementGroupID &placement_group_id) override; + protected: /// Lease resource from the specified node for the specified bundle. void ReserveResourceFromNode(const std::shared_ptr &bundle, @@ -145,34 +168,59 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { std::shared_ptr GetOrConnectLeaseClient( const rpc::Address &raylet_address); + void OnAllBundleSchedulingRequestReturned( + const std::shared_ptr &placement_group, + const std::vector> &bundles, + const std::shared_ptr &bundle_locations, + const std::function)> + &schedule_failure_handler, + const std::function)> + &schedule_success_handler); + /// Generate schedule conetext. std::unique_ptr GetScheduleContext(); /// A timer that ticks every cancel resource failure milliseconds. boost::asio::deadline_timer return_timer_; /// Used to update placement group information upon creation, deletion, etc. + std::shared_ptr gcs_table_storage_; + /// Reference of GcsNodeManager. const GcsNodeManager &gcs_node_manager_; + /// The cached node clients which are used to communicate with raylet to lease workers. absl::flat_hash_map> remote_lease_clients_; + /// Factory for producing new clients to request leases from remote nodes. ReserveResourceClientFactoryFn lease_client_factory_; /// Map from node ID to the set of bundles for whom we are trying to acquire a lease /// from that node. This is needed so that we can retry lease requests from the node /// until we receive a reply or the node is removed. + /// TODO(sang): We don't currently handle retry. absl::flat_hash_map> node_to_bundles_when_leasing_; /// Map from node ID to the set of bundles. This is needed so that we can reschedule /// bundles when a node is dead. - absl::flat_hash_map>> + absl::flat_hash_map>> node_to_leased_bundles_; /// A vector to store all the schedule strategy. std::vector> scheduler_strategies_; + + /// Set of placement group that have lease requests in flight to nodes. + /// It is required to know if placement group has been removed or not. + absl::flat_hash_set placement_group_leasing_in_progress_; + + /// A map from placement group id to bundle locations. + /// It is used to destroy bundles for the placement group. + /// NOTE: It is a reverse index of `node_to_leased_bundles`. + absl::flat_hash_map> + placement_group_to_bundle_location_; }; } // namespace gcs diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 771f719b5..a5f1afc1d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -34,6 +34,11 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf placement_groups.push_back(placement_group); } + MOCK_METHOD1(DestroyPlacementGroupBundleResourcesIfExists, + void(const PlacementGroupID &placement_group_id)); + + MOCK_METHOD1(MarkScheduleCancelled, void(const PlacementGroupID &placement_group_id)); + std::vector> placement_groups; }; @@ -47,6 +52,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { } void SetUp() override { + // mock_placement_group_scheduler_.reset(new MockPlacementGroupScheduler()); thread_io_service_.reset(new std::thread([this] { std::unique_ptr work( new boost::asio::io_service::work(io_service_)); @@ -72,8 +78,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestBasic) { auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, - [&finished_placement_group_count]() { ++finished_placement_group_count; }); + create_placement_group_request, [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); @@ -81,15 +88,16 @@ TEST_F(GcsPlacementGroupManagerTest, TestBasic) { gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); WaitForExpectedCount(finished_placement_group_count, 1); - ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); } TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, - [&finished_placement_group_count]() { ++finished_placement_group_count; }); + create_placement_group_request, [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); @@ -102,10 +110,10 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulingFailed) { mock_placement_group_scheduler_->placement_groups.clear(); ASSERT_EQ(finished_placement_group_count, 0); - // Check that the placement_group is in state `ALIVE`. + // Check that the placement_group is in state `CREATED`. gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); WaitForExpectedCount(finished_placement_group_count, 1); - ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); } TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { @@ -113,8 +121,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { Mocker::GenCreatePlacementGroupRequest("test_name"); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, - [&finished_placement_group_count]() { ++finished_placement_group_count; }); + create_placement_group_request, [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); @@ -123,7 +132,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestGetPlacementGroupIDByName) { gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); WaitForExpectedCount(finished_placement_group_count, 1); - ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::ALIVE); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); ASSERT_EQ( gcs_placement_group_manager_->GetPlacementGroupIDByName("test_name"), PlacementGroupID::FromBinary( @@ -134,8 +143,9 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) { auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); std::atomic finished_placement_group_count(0); gcs_placement_group_manager_->RegisterPlacementGroup( - create_placement_group_request, - [&finished_placement_group_count]() { ++finished_placement_group_count; }); + create_placement_group_request, [&finished_placement_group_count](Status status) { + ++finished_placement_group_count; + }); ASSERT_EQ(finished_placement_group_count, 0); ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); @@ -149,6 +159,123 @@ TEST_F(GcsPlacementGroupManagerTest, TestRescheduleWhenNodeAdd) { EXPECT_TRUE(WaitForCondition(condition, 10 * 1000)); } +TEST_F(GcsPlacementGroupManagerTest, TestRemovingPendingPlacementGroup) { + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + std::atomic finished_placement_group_count(0); + std::atomic failed_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + create_placement_group_request, + [&finished_placement_group_count, &failed_placement_group_count](Status status) { + if (status.ok()) { + ++finished_placement_group_count; + } else { + ++failed_placement_group_count; + } + }); + + ASSERT_EQ(finished_placement_group_count, 0); + ASSERT_EQ(failed_placement_group_count, 0); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); + mock_placement_group_scheduler_->placement_groups.clear(); + + gcs_placement_group_manager_->OnPlacementGroupCreationFailed(placement_group); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id, + [](Status status) {}); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED); + + // Make sure it is not rescheduled + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0); + mock_placement_group_scheduler_->placement_groups.clear(); + WaitForExpectedCount(finished_placement_group_count, 0); + WaitForExpectedCount(failed_placement_group_count, 1); + + // Make sure we can re-remove again. + gcs_placement_group_manager_->RemovePlacementGroup( + placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); +} + +TEST_F(GcsPlacementGroupManagerTest, TestRemovingLeasingPlacementGroup) { + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + std::atomic finished_placement_group_count(0); + std::atomic failed_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + create_placement_group_request, + [&finished_placement_group_count, &failed_placement_group_count](Status status) { + if (status.ok()) { + ++finished_placement_group_count; + } else { + ++failed_placement_group_count; + } + }); + + ASSERT_EQ(finished_placement_group_count, 0); + ASSERT_EQ(failed_placement_group_count, 0); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 1); + auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); + mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::PENDING); + + // Placement group is in leasing state. + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + EXPECT_CALL(*mock_placement_group_scheduler_, MarkScheduleCancelled(placement_group_id)) + .Times(1); + gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id, + [](Status status) {}); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED); + + // Make sure it is not rescheduled + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0); + mock_placement_group_scheduler_->placement_groups.clear(); + WaitForExpectedCount(finished_placement_group_count, 0); + WaitForExpectedCount(failed_placement_group_count, 1); + + // Make sure we can re-remove again. + gcs_placement_group_manager_->RemovePlacementGroup( + placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); +} + +TEST_F(GcsPlacementGroupManagerTest, TestRemovingCreatedPlacementGroup) { + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + std::atomic finished_placement_group_count(0); + gcs_placement_group_manager_->RegisterPlacementGroup( + create_placement_group_request, [&finished_placement_group_count](Status status) { + if (status.ok()) { + ++finished_placement_group_count; + } + }); + auto placement_group = mock_placement_group_scheduler_->placement_groups.back(); + mock_placement_group_scheduler_->placement_groups.pop_back(); + + gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); + WaitForExpectedCount(finished_placement_group_count, 1); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); + + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + EXPECT_CALL(*mock_placement_group_scheduler_, + DestroyPlacementGroupBundleResourcesIfExists(placement_group_id)) + .Times(1); + EXPECT_CALL(*mock_placement_group_scheduler_, MarkScheduleCancelled(placement_group_id)) + .Times(0); + gcs_placement_group_manager_->RemovePlacementGroup(placement_group_id, + [](Status status) {}); + ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::REMOVED); + + // Make sure it is not rescheduled + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + ASSERT_EQ(mock_placement_group_scheduler_->placement_groups.size(), 0); + mock_placement_group_scheduler_->placement_groups.clear(); + ASSERT_EQ(finished_placement_group_count, 1); + + // Make sure we can re-remove again. + gcs_placement_group_manager_->RemovePlacementGroup( + placement_group_id, [](Status status) { ASSERT_TRUE(status.ok()); }); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 33448b376..4eca5d985 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -305,13 +305,87 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestStrictPackStrategyResourceCheck) { // requirement. In this case, the bundles should be scheduled on Node0. auto node1 = Mocker::GenNodeInfo(1); AddNode(node1, 1); - gcs_placement_group_scheduler_->Schedule(placement_group, failure_handler, + auto create_placement_group_request2 = + Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::STRICT_PACK); + auto placement_group2 = + std::make_shared(create_placement_group_request2); + gcs_placement_group_scheduler_->Schedule(placement_group2, failure_handler, success_handler); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); ASSERT_TRUE(raylet_client_->GrantResourceReserve()); WaitPendingDone(success_placement_groups_, 2); } +TEST_F(GcsPlacementGroupSchedulerTest, DestroyPlacementGroup) { + auto node = Mocker::GenNodeInfo(); + AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = + std::make_shared(create_placement_group_request); + + // Schedule the placement_group with 1 available node, and the lease request should be + // send to the node. + gcs_placement_group_scheduler_->Schedule( + placement_group, + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }, + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + WaitPendingDone(failure_placement_groups_, 0); + WaitPendingDone(success_placement_groups_, 1); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists( + placement_group_id); + ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); + + // Subsequent destroy request should not do anything. + gcs_placement_group_scheduler_->DestroyPlacementGroupBundleResourcesIfExists( + placement_group_id); + ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve()); + ASSERT_FALSE(raylet_client_->GrantCancelResourceReserve()); +} + +TEST_F(GcsPlacementGroupSchedulerTest, DestroyCancelledPlacementGroup) { + auto node = Mocker::GenNodeInfo(); + AddNode(node); + ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size()); + + auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); + auto placement_group = + std::make_shared(create_placement_group_request); + const auto &placement_group_id = placement_group->GetPlacementGroupID(); + + // Schedule the placement_group with 1 available node, and the lease request should be + // send to the node. + gcs_placement_group_scheduler_->Schedule( + placement_group, + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + failure_placement_groups_.emplace_back(std::move(placement_group)); + }, + [this](std::shared_ptr placement_group) { + absl::MutexLock lock(&vector_mutex_); + success_placement_groups_.emplace_back(std::move(placement_group)); + }); + + // Now, cancel the schedule request. + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + gcs_placement_group_scheduler_->MarkScheduleCancelled(placement_group_id); + ASSERT_TRUE(raylet_client_->GrantResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); + ASSERT_TRUE(raylet_client_->GrantCancelResourceReserve()); + WaitPendingDone(failure_placement_groups_, 1); +} + TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyReschedulingWhenNodeAdd) { ReschedulingWhenNodeAddTest(rpc::PlacementStrategy::PACK); } diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 088ca0c32..73020e90c 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -189,6 +189,20 @@ struct GcsServerMocker { } } + // Trigger reply to CancelResourceReserve. + bool GrantCancelResourceReserve(bool success = true) { + Status status = Status::OK(); + rpc::CancelResourceReserveReply reply; + if (return_callbacks.size() == 0) { + return false; + } else { + auto callback = return_callbacks.front(); + callback(status, reply); + return_callbacks.pop_front(); + return true; + } + } + ~MockRayletResourceClient() {} int num_lease_requested = 0; diff --git a/src/ray/gcs/redis_accessor.cc b/src/ray/gcs/redis_accessor.cc index 08ce4c3a5..b3cebaab1 100644 --- a/src/ray/gcs/redis_accessor.cc +++ b/src/ray/gcs/redis_accessor.cc @@ -833,6 +833,11 @@ Status RedisPlacementGroupInfoAccessor::AsyncCreatePlacementGroup( return Status::Invalid("Not implemented"); } +Status RedisPlacementGroupInfoAccessor::AsyncRemovePlacementGroup( + const PlacementGroupID &placement_group_id, const StatusCallback &callback) { + return Status::Invalid("Not implemented"); +} + Status RedisPlacementGroupInfoAccessor::AsyncGet( const PlacementGroupID &placement_group_id, const OptionalItemCallback &callback) { diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index da2229dfb..2dadee599 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -479,6 +479,9 @@ class RedisPlacementGroupInfoAccessor : public PlacementGroupInfoAccessor { Status AsyncCreatePlacementGroup( const PlacementGroupSpecification &placement_group_spec) override; + Status AsyncRemovePlacementGroup(const PlacementGroupID &placement_group_id, + const StatusCallback &callback) override; + Status AsyncGet( const PlacementGroupID &placement_group_id, const OptionalItemCallback &callback) override; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 12eb6cd79..7b003ed35 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -165,10 +165,10 @@ message PlacementGroupTableData { enum PlacementGroupState { // Placement Group is pending or scheduling PENDING = 0; - // Placement Group is alive. - ALIVE = 1; - // Placement Group is already dead and won't be reschedule. - DEAD = 2; + // Placement Group is created. + CREATED = 1; + // Placement Group is already removed and won't be reschedule. + REMOVED = 2; } // ID of the PlacementGroup. diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index 24796b495..141e2b526 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -169,6 +169,11 @@ service PlacementGroupInfoGcsService { // Create placement group via gcs service. rpc CreatePlacementGroup(CreatePlacementGroupRequest) returns (CreatePlacementGroupReply); + + // Remove placement group via gcs service. + rpc RemovePlacementGroup(RemovePlacementGroupRequest) + returns (RemovePlacementGroupReply); + // Get placement group information via gcs service. rpc GetPlacementGroup(GetPlacementGroupRequest) returns (GetPlacementGroupReply); } @@ -499,6 +504,14 @@ message CreatePlacementGroupReply { GcsStatus status = 1; } +message RemovePlacementGroupRequest { + bytes placement_group_id = 1; +} + +message RemovePlacementGroupReply { + GcsStatus status = 1; +} + message GetPlacementGroupRequest { bytes placement_group_id = 1; } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 6933101cc..37b5557f3 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -251,6 +251,9 @@ class GcsRpcClient { VOID_GCS_RPC_CLIENT_METHOD(PlacementGroupInfoGcsService, CreatePlacementGroup, placement_group_info_grpc_client_, ) + /// Remove placement group via GCS Service. + VOID_GCS_RPC_CLIENT_METHOD(PlacementGroupInfoGcsService, RemovePlacementGroup, + placement_group_info_grpc_client_, ) /// Get placement group via GCS Service. VOID_GCS_RPC_CLIENT_METHOD(PlacementGroupInfoGcsService, GetPlacementGroup, placement_group_info_grpc_client_, ) diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 628e74b39..dc15a35c2 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -457,6 +457,10 @@ class PlacementGroupInfoGcsServiceHandler { CreatePlacementGroupReply *reply, SendReplyCallback send_reply_callback) = 0; + virtual void HandleRemovePlacementGroup(const RemovePlacementGroupRequest &request, + RemovePlacementGroupReply *reply, + SendReplyCallback send_reply_callback) = 0; + virtual void HandleGetPlacementGroup(const GetPlacementGroupRequest &request, GetPlacementGroupReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -479,6 +483,7 @@ class PlacementGroupInfoGrpcService : public GrpcService { const std::unique_ptr &cq, std::vector> *server_call_factories) override { PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(CreatePlacementGroup); + PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(RemovePlacementGroup); PLACEMENT_GROUP_INFO_SERVICE_RPC_HANDLER(GetPlacementGroup); }