From 2e41a29c8fe3405fee1e56d9bc82b54ede0e4b62 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 28 Sep 2020 16:56:43 +0800 Subject: [PATCH] [Placement Group]Support placement group request processing idempotent in raylet (#10998) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add part code * fix review comment * fix review comment * fix review comment Co-authored-by: 灵洵 --- src/ray/raylet/node_manager.cc | 80 +++++++++++++++++++++++----------- src/ray/raylet/node_manager.h | 6 +++ 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index da9d72c8e..8b42e5efa 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1844,7 +1844,6 @@ void NodeManager::HandleCancelResourceReserve( auto bundle_spec = BundleSpecification(request.bundle_spec()); RAY_LOG(INFO) << "Request to cancel reserved resource is received, " << bundle_spec.DebugString(); - const auto &resource_set = bundle_spec.GetRequiredResources(); // Kill all workers that are currently associated with the placement group. std::vector> workers_associated_with_pg; @@ -1870,29 +1869,13 @@ void NodeManager::HandleCancelResourceReserve( KillWorker(worker); } - // We should commit resources if it weren't because - // ReturnBundleResources requires resources to be committed when it is called. - auto it = bundle_state_map_.find(bundle_spec.BundleId()); - RAY_CHECK(it != bundle_state_map_.end()) - << "Cancel requests are received to raylet although it hasn't received any prepare " - "or commit requests. This must be an anomaly."; - const auto &bundle_state = it->second; - if (bundle_state->state == CommitState::PREPARED) { - CommitBundle(cluster_resource_map_, bundle_spec); + // Return bundle resources. + if (ReturnBundleResources(bundle_spec)) { + // Call task dispatch to assign work to the released resources. + TryLocalInfeasibleTaskScheduling(); + DispatchTasks(local_queues_.GetReadyTasksByClass()); } - bundle_state_map_.erase(it); - - // Return resources. - for (auto resource : resource_set.GetResourceMap()) { - local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(), - bundle_spec.Index(), resource.first); - } - cluster_resource_map_[self_node_id_].ReturnBundleResources( - bundle_spec.PlacementGroupId(), bundle_spec.Index()); send_reply_callback(Status::OK(), nullptr, nullptr); - // Call task dispatch to assign work to the released resources. - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); } void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, @@ -2040,6 +2023,24 @@ void NodeManager::ProcessSetResourceRequest( bool NodeManager::PrepareBundle( std::unordered_map &resource_map, const BundleSpecification &bundle_spec) { + // We will first delete the existing bundle to ensure idempotent. + // The reason why we do this is: after GCS restarts, placement group can be rescheduled + // directly without rolling back the operations performed before the restart. + const auto &bundle_id = bundle_spec.BundleId(); + auto iter = bundle_state_map_.find(bundle_id); + if (iter != bundle_state_map_.end()) { + if (iter->second->state == CommitState::COMMITTED) { + // If the bundle state is already committed, it means that prepare request is just + // stale. + RAY_LOG(INFO) << "Duplicate prepare bundle request, skip it directly."; + return true; + } else { + // If there was a bundle in prepare state, it already locked resources, we will + // return bundle resources. + ReturnBundleResources(bundle_spec); + } + } + // TODO(sang): It is currently not idempotent because we don't retry. Make it idempotent // once retry is implemented. If the resource map contains the local raylet, update load // before calling policy. @@ -2053,7 +2054,6 @@ bool NodeManager::PrepareBundle( auto bundle_state = std::make_shared(); if (reserve_resource_success) { // Register states. - const auto &bundle_id = bundle_spec.BundleId(); auto it = bundle_state_map_.find(bundle_id); // Same bundle cannot be rescheduled. RAY_CHECK(it == bundle_state_map_.end()); @@ -2081,8 +2081,13 @@ void NodeManager::CommitBundle( const auto &bundle_id = bundle_spec.BundleId(); auto it = bundle_state_map_.find(bundle_id); // When bundle is committed, it should've been prepared already. - // We don't need this check if commit becomes idempotent. - RAY_CHECK(it != bundle_state_map_.end()); + // If GCS call `CommitBundleResources` after `CancelResourceReserve`, we will skip it + // directly. + if (it == bundle_state_map_.end()) { + RAY_LOG(INFO) << "The bundle has been cancelled. Skip it directly. Bundle info is " + << bundle_spec.DebugString(); + return; + } const auto &bundle_state = it->second; bundle_state->state = CommitState::COMMITTED; const auto &acquired_resources = bundle_state->acquired_resources; @@ -3496,6 +3501,31 @@ void NodeManager::RecordMetrics() { stats::DeadActors().Record(statistical_data.dead_actors); } +bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) { + // We should commit resources if it weren't because + // ReturnBundleResources requires resources to be committed when it is called. + auto it = bundle_state_map_.find(bundle_spec.BundleId()); + if (it == bundle_state_map_.end()) { + RAY_LOG(INFO) << "Duplicate cancel request, skip it directly."; + return false; + } + const auto &bundle_state = it->second; + if (bundle_state->state == CommitState::PREPARED) { + CommitBundle(cluster_resource_map_, bundle_spec); + } + bundle_state_map_.erase(it); + + // Return resources. + const auto &resource_set = bundle_spec.GetRequiredResources(); + for (const auto &resource : resource_set.GetResourceMap()) { + local_available_resources_.ReturnBundleResources(bundle_spec.PlacementGroupId(), + bundle_spec.Index(), resource.first); + } + cluster_resource_map_[self_node_id_].ReturnBundleResources( + bundle_spec.PlacementGroupId(), bundle_spec.Index()); + return true; +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d1a720219..d72a071ab 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -680,6 +680,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Whether a task is an actor creation task. bool IsActorCreationTask(const TaskID &task_id); + /// Return back all the bundle resource. + /// + /// \param bundle_spec: Specification of bundle whose resources will be returned. + /// \return Whether the resource is returned successfully. + bool ReturnBundleResources(const BundleSpecification &bundle_spec); + /// ID of this node. NodeID self_node_id_; boost::asio::io_service &io_service_;