From 1ae4d2873af5c1815e702fbb2b8bc2c6ad29ec0e Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Tue, 24 Nov 2020 21:11:18 +0800 Subject: [PATCH] [GCS] refactor gcs initialization (#11890) --- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 125 ++++----- src/ray/gcs/gcs_server/gcs_actor_manager.h | 7 +- src/ray/gcs/gcs_server/gcs_init_data.cc | 120 ++++++++ src/ray/gcs/gcs_server/gcs_init_data.h | 129 +++++++++ src/ray/gcs/gcs_server/gcs_node_manager.cc | 50 ++-- src/ray/gcs/gcs_server/gcs_node_manager.h | 7 +- src/ray/gcs/gcs_server/gcs_object_manager.cc | 25 +- src/ray/gcs/gcs_server/gcs_object_manager.h | 7 +- .../gcs_server/gcs_placement_group_manager.cc | 48 ++-- .../gcs_server/gcs_placement_group_manager.h | 7 +- src/ray/gcs/gcs_server/gcs_server.cc | 264 +++++++++--------- src/ray/gcs/gcs_server/gcs_server.h | 43 ++- src/ray/gcs/gcs_server/gcs_worker_manager.cc | 10 + src/ray/gcs/gcs_server/gcs_worker_manager.h | 5 + 14 files changed, 546 insertions(+), 301 deletions(-) create mode 100644 src/ray/gcs/gcs_server/gcs_init_data.cc create mode 100644 src/ray/gcs/gcs_server/gcs_init_data.h diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 3ba2ca0ac..4c15c689f 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -945,76 +945,71 @@ void GcsActorManager::SchedulePendingActors() { } } -void GcsActorManager::LoadInitialData(const EmptyCallback &done) { - RAY_LOG(INFO) << "Loading initial data."; - auto callback = [this, - done](const std::unordered_map &result) { - std::unordered_map> node_to_workers; - for (auto &item : result) { - auto actor = std::make_shared(item.second); - if (item.second.state() != ray::rpc::ActorTableData::DEAD) { - registered_actors_.emplace(item.first, actor); +void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) { + const auto &jobs = gcs_init_data.Jobs(); + std::unordered_map> node_to_workers; + for (const auto &entry : gcs_init_data.Actors()) { + auto job_iter = jobs.find(entry.first.JobId()); + auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead()); + auto actor = std::make_shared(entry.second); + if (entry.second.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) { + registered_actors_.emplace(entry.first, actor); - if (!actor->GetName().empty()) { - named_actors_.emplace(actor->GetName(), actor->GetActorID()); - } - - if (item.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { - const auto &owner = actor->GetOwnerAddress(); - const auto &owner_node = NodeID::FromBinary(owner.raylet_id()); - const auto &owner_worker = WorkerID::FromBinary(owner.worker_id()); - RAY_CHECK(unresolved_actors_[owner_node][owner_worker] - .emplace(actor->GetActorID()) - .second); - } else if (item.second.state() == ray::rpc::ActorTableData::ALIVE) { - created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), - actor->GetActorID()); - } - - if (!actor->IsDetached()) { - // This actor is owned. Send a long polling request to the actor's - // owner to determine when the actor should be removed. - PollOwnerForActorOutOfScope(actor); - } - - if (!actor->GetWorkerID().IsNil()) { - RAY_CHECK(!actor->GetNodeID().IsNil()); - node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID()); - } - } else { - destroyed_actors_.emplace(item.first, actor); - sorted_destroyed_actor_list_.emplace_back(item.first, - (int64_t)item.second.timestamp()); + if (!actor->GetName().empty()) { + named_actors_.emplace(actor->GetName(), actor->GetActorID()); } - } - sorted_destroyed_actor_list_.sort([](const std::pair &left, - const std::pair &right) { - return left.second < right.second; - }); - // Notify raylets to release unused workers. - gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers); - - RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size() - << ", and the number of created actors is " << created_actors_.size(); - for (auto &item : registered_actors_) { - auto &actor = item.second; - if (actor->GetState() == ray::rpc::ActorTableData::PENDING_CREATION || - actor->GetState() == ray::rpc::ActorTableData::RESTARTING) { - // We should not reschedule actors in state of `ALIVE`. - // We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the - // dependencies of them may not have been resolved yet. - RAY_LOG(INFO) << "Rescheduling a non-alive actor, actor id = " - << actor->GetActorID() << ", state = " << actor->GetState() - << ", job id = " << actor->GetActorID().JobId(); - gcs_actor_scheduler_->Reschedule(actor); + if (entry.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) { + const auto &owner = actor->GetOwnerAddress(); + const auto &owner_node = NodeID::FromBinary(owner.raylet_id()); + const auto &owner_worker = WorkerID::FromBinary(owner.worker_id()); + RAY_CHECK(unresolved_actors_[owner_node][owner_worker] + .emplace(actor->GetActorID()) + .second); + } else if (entry.second.state() == ray::rpc::ActorTableData::ALIVE) { + created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(), + actor->GetActorID()); } - } - RAY_LOG(INFO) << "Finished loading initial data."; - done(); - }; - RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetAll(callback)); + if (!actor->IsDetached()) { + // This actor is owned. Send a long polling request to the actor's + // owner to determine when the actor should be removed. + PollOwnerForActorOutOfScope(actor); + } + + if (!actor->GetWorkerID().IsNil()) { + RAY_CHECK(!actor->GetNodeID().IsNil()); + node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID()); + } + } else { + destroyed_actors_.emplace(entry.first, actor); + sorted_destroyed_actor_list_.emplace_back(entry.first, + (int64_t)entry.second.timestamp()); + } + } + sorted_destroyed_actor_list_.sort([](const std::pair &left, + const std::pair &right) { + return left.second < right.second; + }); + + // Notify raylets to release unused workers. + gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers); + + RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size() + << ", and the number of created actors is " << created_actors_.size(); + for (auto &item : registered_actors_) { + auto &actor = item.second; + if (actor->GetState() == ray::rpc::ActorTableData::PENDING_CREATION || + actor->GetState() == ray::rpc::ActorTableData::RESTARTING) { + // We should not reschedule actors in state of `ALIVE`. + // We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the + // dependencies of them may not have been resolved yet. + RAY_LOG(INFO) << "Rescheduling a non-alive actor, actor id = " + << actor->GetActorID() << ", state = " << actor->GetState() + << ", job id = " << actor->GetActorID().JobId(); + gcs_actor_scheduler_->Reschedule(actor); + } + } } void GcsActorManager::OnJobFinished(const JobID &job_id) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 7731f4eec..ecc6f279a 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -21,6 +21,7 @@ #include "ray/common/task/task_execution_spec.h" #include "ray/common/task/task_spec.h" #include "ray/gcs/gcs_server/gcs_actor_scheduler.h" +#include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_gcs_client.h" @@ -275,11 +276,11 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// \param actor The actor that has been created. void OnActorCreationSuccess(const std::shared_ptr &actor); - /// Load initial data from gcs storage to memory cache asynchronously. + /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// - /// \param done Callback that will be called when load is complete. - void LoadInitialData(const EmptyCallback &done); + /// \param gcs_init_data. + void Initialize(const GcsInitData &gcs_init_data); /// Delete non-detached actor information from durable storage once the associated job /// finishes. diff --git a/src/ray/gcs/gcs_server/gcs_init_data.cc b/src/ray/gcs/gcs_server/gcs_init_data.cc new file mode 100644 index 000000000..78d089c97 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_init_data.cc @@ -0,0 +1,120 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/gcs/gcs_server/gcs_init_data.h" + +namespace ray { +namespace gcs { +void GcsInitData::AsyncLoad(const EmptyCallback &on_done) { + // There are 6 kinds of table data need to be loaded. + auto count_down = std::make_shared(6); + auto on_load_finished = [count_down, on_done] { + if (--(*count_down) == 0) { + if (on_done) { + on_done(); + } + } + }; + + AsyncLoadJobTableData(on_load_finished); + + AsyncLoadNodeTableData(on_load_finished); + + AsyncLoadObjectTableData(on_load_finished); + + AsyncLoadResourceTableData(on_load_finished); + + AsyncLoadActorTableData(on_load_finished); + + AsyncLoadPlacementGroupTableData(on_load_finished); +} + +void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading job table data."; + auto load_job_table_data_callback = + [this, on_done](const std::unordered_map &result) { + job_table_data_ = result; + RAY_LOG(INFO) << "Finished loading job table data, size = " + << job_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(load_job_table_data_callback)); +} + +void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading node table data."; + auto load_node_table_data_callback = + [this, on_done](const std::unordered_map &result) { + node_table_data_ = result; + RAY_LOG(INFO) << "Finished loading node table data, size = " + << node_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(load_node_table_data_callback)); +} + +void GcsInitData::AsyncLoadObjectTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading object table data."; + auto load_object_table_data_callback = + [this, + on_done](const std::unordered_map &result) { + object_table_data_ = result; + RAY_LOG(INFO) << "Finished loading object table data, size = " + << object_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(load_object_table_data_callback)); +} + +void GcsInitData::AsyncLoadResourceTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading cluster resources table data."; + auto load_resource_table_data_callback = + [this, on_done](const std::unordered_map &result) { + resource_table_data_ = result; + RAY_LOG(INFO) << "Finished loading cluster resources table data, size = " + << resource_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK( + gcs_table_storage_->NodeResourceTable().GetAll(load_resource_table_data_callback)); +} + +void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading placement group table data."; + auto load_placement_group_table_data_callback = + [this, on_done](const std::unordered_map &result) { + placement_group_table_data_ = result; + RAY_LOG(INFO) << "Finished loading placement group table data, size = " + << placement_group_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll( + load_placement_group_table_data_callback)); +} + +void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) { + RAY_LOG(INFO) << "Loading actor table data."; + auto load_actor_table_data_callback = + [this, on_done](const std::unordered_map &result) { + actor_table_data_ = result; + RAY_LOG(INFO) << "Finished loading actor table data, size = " + << actor_table_data_.size(); + on_done(); + }; + RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetAll(load_actor_table_data_callback)); +} + +} // namespace gcs +} // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_init_data.h b/src/ray/gcs/gcs_server/gcs_init_data.h new file mode 100644 index 000000000..b5707f282 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_init_data.h @@ -0,0 +1,129 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/common/id.h" +#include "ray/gcs/callback.h" +#include "ray/gcs/gcs_server/gcs_table_storage.h" +#include "src/ray/protobuf/gcs.pb.h" + +namespace ray { +namespace gcs { + +/// `GcsInitData` is used to initialize all modules which need to recovery status when GCS +/// server restarts. +/// It loads all required metadata from the store into memory at once, so that the next +/// initialization process can be synchronized. +class GcsInitData { + public: + /// Create a GcsInitData. + /// + /// \param gcs_table_storage The storage from which the metadata will be loaded. + explicit GcsInitData(std::shared_ptr gcs_table_storage) + : gcs_table_storage_(std::move(gcs_table_storage)) {} + + /// Load all required metadata from the store into memory at once asynchronously. + /// + /// \param on_done The callback when all metadatas are loaded successfully. + void AsyncLoad(const EmptyCallback &on_done); + + /// Get job metadata. + const std::unordered_map &Jobs() const { + return job_table_data_; + } + + /// Get node metadata. + const std::unordered_map &Nodes() const { + return node_table_data_; + } + + /// Get object location metadata. + const std::unordered_map &Objects() const { + return object_table_data_; + } + + /// Get resource metadata. + const std::unordered_map &ClusterResources() const { + return resource_table_data_; + } + + /// Get actor metadata. + const std::unordered_map &Actors() const { + return actor_table_data_; + } + + /// Get placement group metadata. + const std::unordered_map + &PlacementGroups() const { + return placement_group_table_data_; + } + + private: + /// Load job metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when job metadata is loaded successfully. + void AsyncLoadJobTableData(const EmptyCallback &on_done); + + /// Load node metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when node metadata is loaded successfully. + void AsyncLoadNodeTableData(const EmptyCallback &on_done); + + /// Load object locations metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when object location metadata is loaded successfully. + void AsyncLoadObjectTableData(const EmptyCallback &on_done); + + /// Load resource metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when resource metadata is loaded successfully. + void AsyncLoadResourceTableData(const EmptyCallback &on_done); + + /// Load placement group metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when placement group metadata is loaded successfully. + void AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done); + + /// Load actor metadata from the store into memory asynchronously. + /// + /// \param on_done The callback when actor metadata is loaded successfully. + void AsyncLoadActorTableData(const EmptyCallback &on_done); + + protected: + /// The gcs table storage. + std::shared_ptr gcs_table_storage_; + + /// Job metadata. + std::unordered_map job_table_data_; + + /// Node metadata. + std::unordered_map node_table_data_; + + /// Object location metadata. + std::unordered_map object_table_data_; + + /// Resource metadata. + std::unordered_map resource_table_data_; + + /// Placement group metadata. + std::unordered_map + placement_group_table_data_; + + /// Actor metadata. + std::unordered_map actor_table_data_; +}; + +} // namespace gcs +} // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 8d2577bdb..dd30d3ef8 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -489,40 +489,26 @@ std::shared_ptr GcsNodeManager::RemoveNode( return removed_node; } -void GcsNodeManager::LoadInitialData(const EmptyCallback &done) { - RAY_LOG(INFO) << "Loading initial data."; - - auto get_node_callback = [this, - done](const std::unordered_map &result) { - for (auto &item : result) { - if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { - // Call `AddNode` for this node to make sure it is tracked by the failure - // detector. - AddNode(std::make_shared(item.second)); - } else if (item.second.state() == rpc::GcsNodeInfo::DEAD) { - dead_nodes_.emplace(item.first, std::make_shared(item.second)); - sorted_dead_node_list_.emplace_back(item.first, item.second.timestamp()); - } +void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) { + for (const auto &item : gcs_init_data.Nodes()) { + if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { + // Call `AddNode` for this node to make sure it is tracked by the failure + // detector. + AddNode(std::make_shared(item.second)); + } else if (item.second.state() == rpc::GcsNodeInfo::DEAD) { + dead_nodes_.emplace(item.first, std::make_shared(item.second)); + sorted_dead_node_list_.emplace_back(item.first, item.second.timestamp()); } - sorted_dead_node_list_.sort([](const std::pair &left, - const std::pair &right) { - return left.second < right.second; - }); + } + sorted_dead_node_list_.sort( + [](const std::pair &left, + const std::pair &right) { return left.second < right.second; }); - auto get_node_resource_callback = - [this, done](const std::unordered_map &result) { - for (auto &item : result) { - if (alive_nodes_.count(item.first)) { - cluster_resources_[item.first] = item.second; - } - } - RAY_LOG(INFO) << "Finished loading initial data."; - done(); - }; - RAY_CHECK_OK( - gcs_table_storage_->NodeResourceTable().GetAll(get_node_resource_callback)); - }; - RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(get_node_callback)); + for (auto &entry : gcs_init_data.ClusterResources()) { + if (alive_nodes_.count(entry.first)) { + cluster_resources_[entry.first] = entry.second; + } + } } void GcsNodeManager::StartNodeFailureDetector() { diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index f3887ead7..3e49db4e2 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -18,6 +18,7 @@ #include "absl/container/flat_hash_set.h" #include "ray/common/id.h" #include "ray/gcs/accessor.h" +#include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/rpc/client_call.h" @@ -152,11 +153,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler { node_added_listeners_.emplace_back(std::move(listener)); } - /// Load initial data from gcs storage to memory cache asynchronously. + /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// - /// \param done Callback that will be called when load is complete. - void LoadInitialData(const EmptyCallback &done); + /// \param gcs_init_data. + void Initialize(const GcsInitData &gcs_init_data); /// Start node failure detector. void StartNodeFailureDetector(); diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.cc b/src/ray/gcs/gcs_server/gcs_object_manager.cc index 1e15987bd..471bc896b 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_object_manager.cc @@ -288,24 +288,17 @@ const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo( return object_data; } -void GcsObjectManager::LoadInitialData(const EmptyCallback &done) { - RAY_LOG(INFO) << "Loading initial data."; - auto callback = [this, - done](const std::unordered_map &result) { - absl::flat_hash_map node_to_objects; - for (auto &item : result) { - for (const auto &loc : item.second.locations()) { - node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first); - } +void GcsObjectManager::Initialize(const GcsInitData &gcs_init_data) { + absl::flat_hash_map node_to_objects; + for (const auto &item : gcs_init_data.Objects()) { + for (const auto &loc : item.second.locations()) { + node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first); } + } - for (auto &item : node_to_objects) { - AddObjectsLocation(item.first, item.second); - } - RAY_LOG(INFO) << "Finished loading initial data."; - done(); - }; - RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(callback)); + for (auto &item : node_to_objects) { + AddObjectsLocation(item.first, item.second); + } } std::string GcsObjectManager::DebugString() const { diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.h b/src/ray/gcs/gcs_server/gcs_object_manager.h index b1602d824..4d728e8e0 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.h +++ b/src/ray/gcs/gcs_server/gcs_object_manager.h @@ -14,6 +14,7 @@ #pragma once +#include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" @@ -53,11 +54,11 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { rpc::RemoveObjectLocationReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Load initial data from gcs storage to memory cache asynchronously. + /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// - /// \param done Callback that will be called when load is complete. - void LoadInitialData(const EmptyCallback &done); + /// \param gcs_init_data. + void Initialize(const GcsInitData &gcs_init_data); std::string DebugString() const; diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 52519bd90..d6babd48e 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -469,42 +469,34 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() { gcs_node_manager_.UpdatePlacementGroupLoad(move(placement_group_load)); } -void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) { - RAY_LOG(INFO) << "GcsPlacementGroupManager loading initial data."; - auto callback = [this, - done](const std::unordered_map &result) { - std::unordered_map> node_to_bundles; - for (auto &item : result) { - auto placement_group = std::make_shared(item.second); - if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) { - registered_placement_groups_.emplace(item.first, placement_group); +void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) { + std::unordered_map> node_to_bundles; + for (auto &item : gcs_init_data.PlacementGroups()) { + auto placement_group = std::make_shared(item.second); + if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) { + registered_placement_groups_.emplace(item.first, placement_group); - if (item.second.state() == rpc::PlacementGroupTableData::PENDING || - item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { - pending_placement_groups_.emplace_back(std::move(placement_group)); - } + if (item.second.state() == rpc::PlacementGroupTableData::PENDING || + item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { + pending_placement_groups_.emplace_back(std::move(placement_group)); + } - if (item.second.state() == rpc::PlacementGroupTableData::CREATED || - item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { - const auto &bundles = item.second.bundles(); - for (auto &bundle : bundles) { - if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { - node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); - } + if (item.second.state() == rpc::PlacementGroupTableData::CREATED || + item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { + const auto &bundles = item.second.bundles(); + for (auto &bundle : bundles) { + if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { + node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); } } } } + } - // Notify raylets to release unused bundles. - gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles); + // Notify raylets to release unused bundles. + gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles); - SchedulePendingPlacementGroups(); - RAY_LOG(INFO) << "Finished loading initial data."; - done(); - }; - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(callback)); + SchedulePendingPlacementGroups(); } std::string GcsPlacementGroupManager::DebugString() const { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index 07a771fb6..eec2048d6 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -20,6 +20,7 @@ #include "ray/common/id.h" #include "ray/common/task/task_execution_spec.h" #include "ray/common/task/task_spec.h" +#include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_node_manager.h" #include "ray/gcs/gcs_server/gcs_placement_group_scheduler.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" @@ -231,11 +232,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// Collect stats from gcs placement group manager in-memory data structures. void CollectStats() const; - /// Load initial data from gcs storage to memory cache asynchronously. + /// Initialize with the gcs tables data synchronously. /// This should be called when GCS server restarts after a failure. /// - /// \param done Callback that will be called when load is complete. - void LoadInitialData(const EmptyCallback &done); + /// \param gcs_init_data. + void Initialize(const GcsInitData &gcs_init_data); std::string DebugString() const; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 7d5e55069..3d634fe60 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -41,7 +41,16 @@ GcsServer::~GcsServer() { Stop(); } void GcsServer::Start() { // Init backend client. - InitBackendClient(); + GcsClientOptions options(config_.redis_address, config_.redis_port, + config_.redis_password, config_.is_test); + redis_gcs_client_ = std::make_shared(options); + auto status = redis_gcs_client_->Connect(main_service_); + RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status; + + // Init redis failure detector. + gcs_redis_failure_detector_ = std::make_shared( + main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); }); + gcs_redis_failure_detector_->Start(); // Init gcs pub sub instance. gcs_pub_sub_ = std::make_shared(redis_gcs_client_->GetRedisClient()); @@ -50,86 +59,55 @@ void GcsServer::Start() { gcs_table_storage_ = std::make_shared(redis_gcs_client_->GetRedisClient()); + // Load gcs tables data asynchronously. + auto gcs_init_data = std::make_shared(gcs_table_storage_); + gcs_init_data->AsyncLoad([this, gcs_init_data] { DoStart(*gcs_init_data); }); +} + +void GcsServer::DoStart(const GcsInitData &gcs_init_data) { // Init gcs node_manager. - InitGcsNodeManager(); + InitGcsNodeManager(gcs_init_data); - // Init gcs detector. - gcs_redis_failure_detector_ = std::make_shared( - main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); }); - gcs_redis_failure_detector_->Start(); - - // Init gcs actor manager. - InitGcsActorManager(); + // Init gcs job manager. + InitGcsJobManager(); // Init gcs placement group manager. - InitGcsPlacementGroupManager(); + InitGcsPlacementGroupManager(gcs_init_data); - // Register rpc service. - gcs_object_manager_ = InitObjectManager(); - object_info_service_.reset( - new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_)); - rpc_server_.RegisterService(*object_info_service_); + // Init gcs actor manager. + InitGcsActorManager(gcs_init_data); - task_info_handler_ = InitTaskInfoHandler(); - task_info_service_.reset( - new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_)); - rpc_server_.RegisterService(*task_info_service_); + // Init object manager. + InitObjectManager(gcs_init_data); - InitGcsJobManager(); - job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_)); - rpc_server_.RegisterService(*job_info_service_); + // Init gcs worker manager. + InitGcsWorkerManager(); - actor_info_service_.reset( - new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_)); - rpc_server_.RegisterService(*actor_info_service_); + // Init task info handler. + InitTaskInfoHandler(); - placement_group_info_service_.reset(new rpc::PlacementGroupInfoGrpcService( - main_service_, *gcs_placement_group_manager_)); - rpc_server_.RegisterService(*placement_group_info_service_); + // Init stats handler. + InitStatsHandler(); - node_info_service_.reset( - new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_)); - rpc_server_.RegisterService(*node_info_service_); + // Install event listeners. + InstallEventListeners(); - stats_handler_ = InitStatsHandler(); - stats_service_.reset(new rpc::StatsGrpcService(main_service_, *stats_handler_)); - rpc_server_.RegisterService(*stats_service_); + // Start RPC server when all tables have finished loading initial + // data. + rpc_server_.Run(); - gcs_worker_manager_ = InitGcsWorkerManager(); - worker_info_service_.reset( - new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_)); - rpc_server_.RegisterService(*worker_info_service_); - - auto load_completed_count = std::make_shared(0); - int load_count = 3; - auto on_done = [this, load_count, load_completed_count]() { - ++(*load_completed_count); - - // We will reschedule the unfinished actors, so we have to load the actor data at the - // end to make sure the other table data is loaded. - if (*load_completed_count == load_count) { - auto actor_manager_load_initial_data_callback = [this]() { - // Start RPC server when all tables have finished loading initial data. - rpc_server_.Run(); - - // Store gcs rpc server address in redis. - StoreGcsServerAddressInRedis(); - - // Only after the rpc_server_ is running can the node failure detector be run. - // Otherwise the node failure detector will mistake some living nodes as dead - // as the timer inside node failure detector is already run. - gcs_node_manager_->StartNodeFailureDetector(); - is_started_ = true; - }; - gcs_actor_manager_->LoadInitialData(actor_manager_load_initial_data_callback); - } - }; - gcs_object_manager_->LoadInitialData(on_done); - gcs_node_manager_->LoadInitialData(on_done); - gcs_placement_group_manager_->LoadInitialData(on_done); + // Store gcs rpc server address in redis. + StoreGcsServerAddressInRedis(); + // Only after the rpc_server_ is running can the node failure + // detector be run. Otherwise the node failure detector will mistake + // some living nodes as dead as the timer inside node failure + // detector is already run. + gcs_node_manager_->StartNodeFailureDetector(); // Print debug info periodically. PrintDebugInfo(); + + is_started_ = true; } void GcsServer::Stop() { @@ -148,17 +126,8 @@ void GcsServer::Stop() { } } -void GcsServer::InitBackendClient() { - GcsClientOptions options(config_.redis_address, config_.redis_port, - config_.redis_password, config_.is_test); - redis_gcs_client_ = std::make_shared(options); - auto status = redis_gcs_client_->Connect(main_service_); - RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status; -} - -void GcsServer::InitGcsNodeManager() { - RAY_CHECK(redis_gcs_client_ != nullptr); - +void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { + RAY_CHECK(redis_gcs_client_ && gcs_table_storage_ && gcs_pub_sub_); node_manager_io_service_thread_.reset(new std::thread([this] { /// The asio work to keep node_manager_io_service_ alive. boost::asio::io_service::work node_manager_io_service_work_(node_manager_io_service_); @@ -166,10 +135,24 @@ void GcsServer::InitGcsNodeManager() { })); gcs_node_manager_ = std::make_shared( main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_); + // Initialize by gcs tables data. + gcs_node_manager_->Initialize(gcs_init_data); + // Register service. + node_info_service_.reset( + new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_)); + rpc_server_.RegisterService(*node_info_service_); } -void GcsServer::InitGcsActorManager() { - RAY_CHECK(gcs_table_storage_ != nullptr && gcs_node_manager_ != nullptr); +void GcsServer::InitGcsJobManager() { + RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); + gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_)); + // Register service. + job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_)); + rpc_server_.RegisterService(*job_info_service_); +} + +void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { + RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_); auto scheduler = std::make_shared( main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_, /*schedule_failure_handler=*/ @@ -203,46 +186,16 @@ void GcsServer::InitGcsActorManager() { [this](const rpc::Address &address) { return std::make_shared(address, client_call_manager_); }); - - gcs_node_manager_->AddNodeAddedListener( - [this](const std::shared_ptr &) { - // Because a new node has been added, we need to try to schedule the pending - // placement groups and the pending actors. - gcs_placement_group_manager_->SchedulePendingPlacementGroups(); - gcs_actor_manager_->SchedulePendingActors(); - }); - - gcs_node_manager_->AddNodeRemovedListener( - [this](std::shared_ptr node) { - // All of the related placement groups and actors should be reconstructed when a - // node is removed from the GCS. - gcs_placement_group_manager_->OnNodeDead(NodeID::FromBinary(node->node_id())); - gcs_actor_manager_->OnNodeDead(NodeID::FromBinary(node->node_id())); - }); - - auto on_subscribe = [this](const std::string &id, const std::string &data) { - rpc::WorkerTableData worker_failure_data; - worker_failure_data.ParseFromString(data); - auto &worker_address = worker_failure_data.worker_address(); - WorkerID worker_id = WorkerID::FromBinary(id); - NodeID node_id = NodeID::FromBinary(worker_address.raylet_id()); - gcs_actor_manager_->OnWorkerDead(node_id, worker_id, - worker_failure_data.intentional_disconnect()); - }; - RAY_CHECK_OK(gcs_pub_sub_->SubscribeAll(WORKER_CHANNEL, on_subscribe, nullptr)); + // Initialize by gcs tables data. + gcs_actor_manager_->Initialize(gcs_init_data); + // Register service. + actor_info_service_.reset( + new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_)); + rpc_server_.RegisterService(*actor_info_service_); } -void GcsServer::InitGcsJobManager() { - gcs_job_manager_ = - std::unique_ptr(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_)); - gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr job_id) { - gcs_actor_manager_->OnJobFinished(*job_id); - gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id); - }); -} - -void GcsServer::InitGcsPlacementGroupManager() { - RAY_CHECK(gcs_table_storage_ != nullptr && gcs_node_manager_ != nullptr); +void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { + RAY_CHECK(gcs_table_storage_ && gcs_node_manager_); auto scheduler = std::make_shared( main_service_, gcs_table_storage_, *gcs_node_manager_, /*lease_client_factory=*/ @@ -255,11 +208,24 @@ void GcsServer::InitGcsPlacementGroupManager() { gcs_placement_group_manager_ = std::make_shared( main_service_, scheduler, gcs_table_storage_, *gcs_node_manager_); + // Initialize by gcs tables data. + gcs_placement_group_manager_->Initialize(gcs_init_data); + // Register service. + placement_group_info_service_.reset(new rpc::PlacementGroupInfoGrpcService( + main_service_, *gcs_placement_group_manager_)); + rpc_server_.RegisterService(*placement_group_info_service_); } -std::unique_ptr GcsServer::InitObjectManager() { - return std::unique_ptr( +void GcsServer::InitObjectManager(const GcsInitData &gcs_init_data) { + RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_); + gcs_object_manager_.reset( new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_)); + // Initialize by gcs tables data. + gcs_object_manager_->Initialize(gcs_init_data); + // Register service. + object_info_service_.reset( + new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_)); + rpc_server_.RegisterService(*object_info_service_); } void GcsServer::StoreGcsServerAddressInRedis() { @@ -277,19 +243,65 @@ void GcsServer::StoreGcsServerAddressInRedis() { RAY_LOG(INFO) << "Finished setting gcs server address: " << address; } -std::unique_ptr GcsServer::InitTaskInfoHandler() { - return std::unique_ptr( +void GcsServer::InitTaskInfoHandler() { + RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); + task_info_handler_.reset( new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_pub_sub_)); + // Register service. + task_info_service_.reset( + new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_)); + rpc_server_.RegisterService(*task_info_service_); } -std::unique_ptr GcsServer::InitStatsHandler() { - return std::unique_ptr( - new rpc::DefaultStatsHandler(gcs_table_storage_)); +void GcsServer::InitStatsHandler() { + RAY_CHECK(gcs_table_storage_); + stats_handler_.reset(new rpc::DefaultStatsHandler(gcs_table_storage_)); + // Register service. + stats_service_.reset(new rpc::StatsGrpcService(main_service_, *stats_handler_)); + rpc_server_.RegisterService(*stats_service_); } -std::unique_ptr GcsServer::InitGcsWorkerManager() { - return std::unique_ptr( +void GcsServer::InitGcsWorkerManager() { + gcs_worker_manager_ = std::unique_ptr( new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_)); + // Register service. + worker_info_service_.reset( + new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_)); + rpc_server_.RegisterService(*worker_info_service_); +} + +void GcsServer::InstallEventListeners() { + // Install node event listeners. + gcs_node_manager_->AddNodeAddedListener([this](std::shared_ptr node) { + // Because a new node has been added, we need to try to schedule the pending + // placement groups and the pending actors. + gcs_placement_group_manager_->SchedulePendingPlacementGroups(); + gcs_actor_manager_->SchedulePendingActors(); + }); + gcs_node_manager_->AddNodeRemovedListener( + [this](std::shared_ptr node) { + auto node_id = NodeID::FromBinary(node->node_id()); + // All of the related placement groups and actors should be reconstructed when a + // node is removed from the GCS. + gcs_placement_group_manager_->OnNodeDead(node_id); + gcs_actor_manager_->OnNodeDead(node_id); + }); + + // Install worker event listener. + gcs_worker_manager_->AddWorkerDeadListener( + [this](std::shared_ptr worker_failure_data) { + auto &worker_address = worker_failure_data->worker_address(); + auto worker_id = WorkerID::FromBinary(worker_address.worker_id()); + auto node_id = NodeID::FromBinary(worker_address.raylet_id()); + gcs_actor_manager_->OnWorkerDead(node_id, worker_id, + worker_failure_data->intentional_disconnect()); + }); + + // Install job event listeners. + gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr job_id) { + gcs_actor_manager_->OnJobFinished(*job_id); + gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id); + }); } void GcsServer::CollectStats() { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 7b4e2e3bc..ef916151f 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -14,6 +14,7 @@ #pragma once +#include "ray/gcs/gcs_server/gcs_init_data.h" #include "ray/gcs/gcs_server/gcs_object_manager.h" #include "ray/gcs/gcs_server/gcs_redis_failure_detector.h" #include "ray/gcs/gcs_server/gcs_table_storage.h" @@ -71,36 +72,34 @@ class GcsServer { bool IsStopped() const { return is_stopped_; } protected: - /// Initialize the backend storage client. - /// The gcs server is just the proxy between the gcs client and reliable storage - /// for the time being, so we need a backend client to connect to the storage. - virtual void InitBackendClient(); + void DoStart(const GcsInitData &gcs_init_data); - /// Initialize the gcs node manager. - /// The gcs node manager is responsible for managing and monitoring all nodes in the - /// cluster. - virtual void InitGcsNodeManager(); + /// Initialize gcs node manager. + void InitGcsNodeManager(const GcsInitData &gcs_init_data); - /// Initialize the gcs actor manager. - virtual void InitGcsActorManager(); + /// Initialize gcs job manager. + void InitGcsJobManager(); - /// Initialize the gcs job manager. - virtual void InitGcsJobManager(); + /// Initialize gcs actor manager. + void InitGcsActorManager(const GcsInitData &gcs_init_data); - /// Initialize the gcs placement group manager. - virtual void InitGcsPlacementGroupManager(); + /// Initialize gcs placement group manager. + void InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data); - /// The object manager - virtual std::unique_ptr InitObjectManager(); + /// Initialize gcs object manager. + void InitObjectManager(const GcsInitData &gcs_init_data); - /// The task info handler - virtual std::unique_ptr InitTaskInfoHandler(); + /// Initialize gcs worker manager. + void InitGcsWorkerManager(); - /// The stats handler - virtual std::unique_ptr InitStatsHandler(); + /// Initialize task info handler. + void InitTaskInfoHandler(); - /// The worker manager - virtual std::unique_ptr InitGcsWorkerManager(); + /// Initialize stats handler. + void InitStatsHandler(); + + /// Install event listeners. + void InstallEventListeners(); private: /// Store the address of GCS server in Redis. diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.cc b/src/ray/gcs/gcs_server/gcs_worker_manager.cc index c9e5e8b94..55768bdfe 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.cc @@ -40,6 +40,10 @@ void GcsWorkerManager::HandleReportWorkerFailure( worker_failure_data->CopyFrom(request.worker_failure()); worker_failure_data->set_is_alive(false); + for (auto &listener : worker_dead_listeners_) { + listener(worker_failure_data); + } + auto on_done = [this, worker_address, worker_id, node_id, worker_failure_data, reply, send_reply_callback](const Status &status) { if (!status.ok()) { @@ -130,5 +134,11 @@ void GcsWorkerManager::HandleAddWorkerInfo(const rpc::AddWorkerInfoRequest &requ } } +void GcsWorkerManager::AddWorkerDeadListener( + std::function)> listener) { + RAY_CHECK(listener != nullptr); + worker_dead_listeners_.emplace_back(std::move(listener)); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.h b/src/ray/gcs/gcs_server/gcs_worker_manager.h index b81c787c6..094e881e6 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.h +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.h @@ -45,9 +45,14 @@ class GcsWorkerManager : public rpc::WorkerInfoHandler { rpc::AddWorkerInfoReply *reply, rpc::SendReplyCallback send_reply_callback) override; + void AddWorkerDeadListener( + std::function)> listener); + private: std::shared_ptr gcs_table_storage_; std::shared_ptr gcs_pub_sub_; + std::vector)>> + worker_dead_listeners_; }; } // namespace gcs