[GCS] refactor gcs initialization (#11890)

This commit is contained in:
ZhuSenlin
2020-11-24 21:11:18 +08:00
committed by GitHub
parent be7938ee09
commit 1ae4d2873a
14 changed files with 546 additions and 301 deletions
+60 -65
View File
@@ -945,76 +945,71 @@ void GcsActorManager::SchedulePendingActors() {
}
}
void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto callback = [this,
done](const std::unordered_map<ActorID, ActorTableData> &result) {
std::unordered_map<NodeID, std::vector<WorkerID>> node_to_workers;
for (auto &item : result) {
auto actor = std::make_shared<GcsActor>(item.second);
if (item.second.state() != ray::rpc::ActorTableData::DEAD) {
registered_actors_.emplace(item.first, actor);
void GcsActorManager::Initialize(const GcsInitData &gcs_init_data) {
const auto &jobs = gcs_init_data.Jobs();
std::unordered_map<NodeID, std::vector<WorkerID>> node_to_workers;
for (const auto &entry : gcs_init_data.Actors()) {
auto job_iter = jobs.find(entry.first.JobId());
auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead());
auto actor = std::make_shared<GcsActor>(entry.second);
if (entry.second.state() != ray::rpc::ActorTableData::DEAD && !is_job_dead) {
registered_actors_.emplace(entry.first, actor);
if (!actor->GetName().empty()) {
named_actors_.emplace(actor->GetName(), actor->GetActorID());
}
if (item.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) {
const auto &owner = actor->GetOwnerAddress();
const auto &owner_node = NodeID::FromBinary(owner.raylet_id());
const auto &owner_worker = WorkerID::FromBinary(owner.worker_id());
RAY_CHECK(unresolved_actors_[owner_node][owner_worker]
.emplace(actor->GetActorID())
.second);
} else if (item.second.state() == ray::rpc::ActorTableData::ALIVE) {
created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(),
actor->GetActorID());
}
if (!actor->IsDetached()) {
// This actor is owned. Send a long polling request to the actor's
// owner to determine when the actor should be removed.
PollOwnerForActorOutOfScope(actor);
}
if (!actor->GetWorkerID().IsNil()) {
RAY_CHECK(!actor->GetNodeID().IsNil());
node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID());
}
} else {
destroyed_actors_.emplace(item.first, actor);
sorted_destroyed_actor_list_.emplace_back(item.first,
(int64_t)item.second.timestamp());
if (!actor->GetName().empty()) {
named_actors_.emplace(actor->GetName(), actor->GetActorID());
}
}
sorted_destroyed_actor_list_.sort([](const std::pair<ActorID, int64_t> &left,
const std::pair<ActorID, int64_t> &right) {
return left.second < right.second;
});
// Notify raylets to release unused workers.
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size()
<< ", and the number of created actors is " << created_actors_.size();
for (auto &item : registered_actors_) {
auto &actor = item.second;
if (actor->GetState() == ray::rpc::ActorTableData::PENDING_CREATION ||
actor->GetState() == ray::rpc::ActorTableData::RESTARTING) {
// We should not reschedule actors in state of `ALIVE`.
// We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the
// dependencies of them may not have been resolved yet.
RAY_LOG(INFO) << "Rescheduling a non-alive actor, actor id = "
<< actor->GetActorID() << ", state = " << actor->GetState()
<< ", job id = " << actor->GetActorID().JobId();
gcs_actor_scheduler_->Reschedule(actor);
if (entry.second.state() == ray::rpc::ActorTableData::DEPENDENCIES_UNREADY) {
const auto &owner = actor->GetOwnerAddress();
const auto &owner_node = NodeID::FromBinary(owner.raylet_id());
const auto &owner_worker = WorkerID::FromBinary(owner.worker_id());
RAY_CHECK(unresolved_actors_[owner_node][owner_worker]
.emplace(actor->GetActorID())
.second);
} else if (entry.second.state() == ray::rpc::ActorTableData::ALIVE) {
created_actors_[actor->GetNodeID()].emplace(actor->GetWorkerID(),
actor->GetActorID());
}
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetAll(callback));
if (!actor->IsDetached()) {
// This actor is owned. Send a long polling request to the actor's
// owner to determine when the actor should be removed.
PollOwnerForActorOutOfScope(actor);
}
if (!actor->GetWorkerID().IsNil()) {
RAY_CHECK(!actor->GetNodeID().IsNil());
node_to_workers[actor->GetNodeID()].emplace_back(actor->GetWorkerID());
}
} else {
destroyed_actors_.emplace(entry.first, actor);
sorted_destroyed_actor_list_.emplace_back(entry.first,
(int64_t)entry.second.timestamp());
}
}
sorted_destroyed_actor_list_.sort([](const std::pair<ActorID, int64_t> &left,
const std::pair<ActorID, int64_t> &right) {
return left.second < right.second;
});
// Notify raylets to release unused workers.
gcs_actor_scheduler_->ReleaseUnusedWorkers(node_to_workers);
RAY_LOG(DEBUG) << "The number of registered actors is " << registered_actors_.size()
<< ", and the number of created actors is " << created_actors_.size();
for (auto &item : registered_actors_) {
auto &actor = item.second;
if (actor->GetState() == ray::rpc::ActorTableData::PENDING_CREATION ||
actor->GetState() == ray::rpc::ActorTableData::RESTARTING) {
// We should not reschedule actors in state of `ALIVE`.
// We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the
// dependencies of them may not have been resolved yet.
RAY_LOG(INFO) << "Rescheduling a non-alive actor, actor id = "
<< actor->GetActorID() << ", state = " << actor->GetState()
<< ", job id = " << actor->GetActorID().JobId();
gcs_actor_scheduler_->Reschedule(actor);
}
}
}
void GcsActorManager::OnJobFinished(const JobID &job_id) {
+4 -3
View File
@@ -21,6 +21,7 @@
#include "ray/common/task/task_execution_spec.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
@@ -275,11 +276,11 @@ class GcsActorManager : public rpc::ActorInfoHandler {
/// \param actor The actor that has been created.
void OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor);
/// Load initial data from gcs storage to memory cache asynchronously.
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);
/// Delete non-detached actor information from durable storage once the associated job
/// finishes.
+120
View File
@@ -0,0 +1,120 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/gcs_init_data.h"
namespace ray {
namespace gcs {
void GcsInitData::AsyncLoad(const EmptyCallback &on_done) {
// There are 6 kinds of table data need to be loaded.
auto count_down = std::make_shared<int>(6);
auto on_load_finished = [count_down, on_done] {
if (--(*count_down) == 0) {
if (on_done) {
on_done();
}
}
};
AsyncLoadJobTableData(on_load_finished);
AsyncLoadNodeTableData(on_load_finished);
AsyncLoadObjectTableData(on_load_finished);
AsyncLoadResourceTableData(on_load_finished);
AsyncLoadActorTableData(on_load_finished);
AsyncLoadPlacementGroupTableData(on_load_finished);
}
void GcsInitData::AsyncLoadJobTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading job table data.";
auto load_job_table_data_callback =
[this, on_done](const std::unordered_map<JobID, rpc::JobTableData> &result) {
job_table_data_ = result;
RAY_LOG(INFO) << "Finished loading job table data, size = "
<< job_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(load_job_table_data_callback));
}
void GcsInitData::AsyncLoadNodeTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading node table data.";
auto load_node_table_data_callback =
[this, on_done](const std::unordered_map<NodeID, rpc::GcsNodeInfo> &result) {
node_table_data_ = result;
RAY_LOG(INFO) << "Finished loading node table data, size = "
<< node_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(load_node_table_data_callback));
}
void GcsInitData::AsyncLoadObjectTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading object table data.";
auto load_object_table_data_callback =
[this,
on_done](const std::unordered_map<ObjectID, rpc::ObjectLocationInfo> &result) {
object_table_data_ = result;
RAY_LOG(INFO) << "Finished loading object table data, size = "
<< object_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(load_object_table_data_callback));
}
void GcsInitData::AsyncLoadResourceTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading cluster resources table data.";
auto load_resource_table_data_callback =
[this, on_done](const std::unordered_map<NodeID, rpc::ResourceMap> &result) {
resource_table_data_ = result;
RAY_LOG(INFO) << "Finished loading cluster resources table data, size = "
<< resource_table_data_.size();
on_done();
};
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().GetAll(load_resource_table_data_callback));
}
void GcsInitData::AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading placement group table data.";
auto load_placement_group_table_data_callback =
[this, on_done](const std::unordered_map<PlacementGroupID,
rpc::PlacementGroupTableData> &result) {
placement_group_table_data_ = result;
RAY_LOG(INFO) << "Finished loading placement group table data, size = "
<< placement_group_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(
load_placement_group_table_data_callback));
}
void GcsInitData::AsyncLoadActorTableData(const EmptyCallback &on_done) {
RAY_LOG(INFO) << "Loading actor table data.";
auto load_actor_table_data_callback =
[this, on_done](const std::unordered_map<ActorID, ActorTableData> &result) {
actor_table_data_ = result;
RAY_LOG(INFO) << "Finished loading actor table data, size = "
<< actor_table_data_.size();
on_done();
};
RAY_CHECK_OK(gcs_table_storage_->ActorTable().GetAll(load_actor_table_data_callback));
}
} // namespace gcs
} // namespace ray
+129
View File
@@ -0,0 +1,129 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "src/ray/protobuf/gcs.pb.h"
namespace ray {
namespace gcs {
/// `GcsInitData` is used to initialize all modules which need to recovery status when GCS
/// server restarts.
/// It loads all required metadata from the store into memory at once, so that the next
/// initialization process can be synchronized.
class GcsInitData {
public:
/// Create a GcsInitData.
///
/// \param gcs_table_storage The storage from which the metadata will be loaded.
explicit GcsInitData(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_table_storage_(std::move(gcs_table_storage)) {}
/// Load all required metadata from the store into memory at once asynchronously.
///
/// \param on_done The callback when all metadatas are loaded successfully.
void AsyncLoad(const EmptyCallback &on_done);
/// Get job metadata.
const std::unordered_map<JobID, rpc::JobTableData> &Jobs() const {
return job_table_data_;
}
/// Get node metadata.
const std::unordered_map<NodeID, rpc::GcsNodeInfo> &Nodes() const {
return node_table_data_;
}
/// Get object location metadata.
const std::unordered_map<ObjectID, rpc::ObjectLocationInfo> &Objects() const {
return object_table_data_;
}
/// Get resource metadata.
const std::unordered_map<NodeID, rpc::ResourceMap> &ClusterResources() const {
return resource_table_data_;
}
/// Get actor metadata.
const std::unordered_map<ActorID, rpc::ActorTableData> &Actors() const {
return actor_table_data_;
}
/// Get placement group metadata.
const std::unordered_map<PlacementGroupID, rpc::PlacementGroupTableData>
&PlacementGroups() const {
return placement_group_table_data_;
}
private:
/// Load job metadata from the store into memory asynchronously.
///
/// \param on_done The callback when job metadata is loaded successfully.
void AsyncLoadJobTableData(const EmptyCallback &on_done);
/// Load node metadata from the store into memory asynchronously.
///
/// \param on_done The callback when node metadata is loaded successfully.
void AsyncLoadNodeTableData(const EmptyCallback &on_done);
/// Load object locations metadata from the store into memory asynchronously.
///
/// \param on_done The callback when object location metadata is loaded successfully.
void AsyncLoadObjectTableData(const EmptyCallback &on_done);
/// Load resource metadata from the store into memory asynchronously.
///
/// \param on_done The callback when resource metadata is loaded successfully.
void AsyncLoadResourceTableData(const EmptyCallback &on_done);
/// Load placement group metadata from the store into memory asynchronously.
///
/// \param on_done The callback when placement group metadata is loaded successfully.
void AsyncLoadPlacementGroupTableData(const EmptyCallback &on_done);
/// Load actor metadata from the store into memory asynchronously.
///
/// \param on_done The callback when actor metadata is loaded successfully.
void AsyncLoadActorTableData(const EmptyCallback &on_done);
protected:
/// The gcs table storage.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
/// Job metadata.
std::unordered_map<JobID, rpc::JobTableData> job_table_data_;
/// Node metadata.
std::unordered_map<NodeID, rpc::GcsNodeInfo> node_table_data_;
/// Object location metadata.
std::unordered_map<ObjectID, rpc::ObjectLocationInfo> object_table_data_;
/// Resource metadata.
std::unordered_map<NodeID, rpc::ResourceMap> resource_table_data_;
/// Placement group metadata.
std::unordered_map<PlacementGroupID, rpc::PlacementGroupTableData>
placement_group_table_data_;
/// Actor metadata.
std::unordered_map<ActorID, rpc::ActorTableData> actor_table_data_;
};
} // namespace gcs
} // namespace ray
+18 -32
View File
@@ -489,40 +489,26 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
return removed_node;
}
void GcsNodeManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto get_node_callback = [this,
done](const std::unordered_map<NodeID, GcsNodeInfo> &result) {
for (auto &item : result) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
// Call `AddNode` for this node to make sure it is tracked by the failure
// detector.
AddNode(std::make_shared<rpc::GcsNodeInfo>(item.second));
} else if (item.second.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
sorted_dead_node_list_.emplace_back(item.first, item.second.timestamp());
}
void GcsNodeManager::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &item : gcs_init_data.Nodes()) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
// Call `AddNode` for this node to make sure it is tracked by the failure
// detector.
AddNode(std::make_shared<rpc::GcsNodeInfo>(item.second));
} else if (item.second.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
sorted_dead_node_list_.emplace_back(item.first, item.second.timestamp());
}
sorted_dead_node_list_.sort([](const std::pair<NodeID, int64_t> &left,
const std::pair<NodeID, int64_t> &right) {
return left.second < right.second;
});
}
sorted_dead_node_list_.sort(
[](const std::pair<NodeID, int64_t> &left,
const std::pair<NodeID, int64_t> &right) { return left.second < right.second; });
auto get_node_resource_callback =
[this, done](const std::unordered_map<NodeID, ResourceMap> &result) {
for (auto &item : result) {
if (alive_nodes_.count(item.first)) {
cluster_resources_[item.first] = item.second;
}
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().GetAll(get_node_resource_callback));
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(get_node_callback));
for (auto &entry : gcs_init_data.ClusterResources()) {
if (alive_nodes_.count(entry.first)) {
cluster_resources_[entry.first] = entry.second;
}
}
}
void GcsNodeManager::StartNodeFailureDetector() {
+4 -3
View File
@@ -18,6 +18,7 @@
#include "absl/container/flat_hash_set.h"
#include "ray/common/id.h"
#include "ray/gcs/accessor.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/rpc/client_call.h"
@@ -152,11 +153,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
node_added_listeners_.emplace_back(std::move(listener));
}
/// Load initial data from gcs storage to memory cache asynchronously.
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);
/// Start node failure detector.
void StartNodeFailureDetector();
+9 -16
View File
@@ -288,24 +288,17 @@ const ObjectLocationInfo GcsObjectManager::GenObjectLocationInfo(
return object_data;
}
void GcsObjectManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "Loading initial data.";
auto callback = [this,
done](const std::unordered_map<ObjectID, ObjectLocationInfo> &result) {
absl::flat_hash_map<NodeID, ObjectSet> node_to_objects;
for (auto &item : result) {
for (const auto &loc : item.second.locations()) {
node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first);
}
void GcsObjectManager::Initialize(const GcsInitData &gcs_init_data) {
absl::flat_hash_map<NodeID, ObjectSet> node_to_objects;
for (const auto &item : gcs_init_data.Objects()) {
for (const auto &loc : item.second.locations()) {
node_to_objects[NodeID::FromBinary(loc.manager())].insert(item.first);
}
}
for (auto &item : node_to_objects) {
AddObjectsLocation(item.first, item.second);
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(gcs_table_storage_->ObjectTable().GetAll(callback));
for (auto &item : node_to_objects) {
AddObjectsLocation(item.first, item.second);
}
}
std::string GcsObjectManager::DebugString() const {
+4 -3
View File
@@ -14,6 +14,7 @@
#pragma once
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
#include "ray/gcs/pubsub/gcs_pub_sub.h"
@@ -53,11 +54,11 @@ class GcsObjectManager : public rpc::ObjectInfoHandler {
rpc::RemoveObjectLocationReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Load initial data from gcs storage to memory cache asynchronously.
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);
std::string DebugString() const;
@@ -469,42 +469,34 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() {
gcs_node_manager_.UpdatePlacementGroupLoad(move(placement_group_load));
}
void GcsPlacementGroupManager::LoadInitialData(const EmptyCallback &done) {
RAY_LOG(INFO) << "GcsPlacementGroupManager loading initial data.";
auto callback = [this,
done](const std::unordered_map<PlacementGroupID,
rpc::PlacementGroupTableData> &result) {
std::unordered_map<NodeID, std::vector<rpc::Bundle>> node_to_bundles;
for (auto &item : result) {
auto placement_group = std::make_shared<GcsPlacementGroup>(item.second);
if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) {
registered_placement_groups_.emplace(item.first, placement_group);
void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) {
std::unordered_map<NodeID, std::vector<rpc::Bundle>> node_to_bundles;
for (auto &item : gcs_init_data.PlacementGroups()) {
auto placement_group = std::make_shared<GcsPlacementGroup>(item.second);
if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) {
registered_placement_groups_.emplace(item.first, placement_group);
if (item.second.state() == rpc::PlacementGroupTableData::PENDING ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
if (item.second.state() == rpc::PlacementGroupTableData::PENDING ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
pending_placement_groups_.emplace_back(std::move(placement_group));
}
if (item.second.state() == rpc::PlacementGroupTableData::CREATED ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
const auto &bundles = item.second.bundles();
for (auto &bundle : bundles) {
if (!NodeID::FromBinary(bundle.node_id()).IsNil()) {
node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle);
}
if (item.second.state() == rpc::PlacementGroupTableData::CREATED ||
item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) {
const auto &bundles = item.second.bundles();
for (auto &bundle : bundles) {
if (!NodeID::FromBinary(bundle.node_id()).IsNil()) {
node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle);
}
}
}
}
}
// Notify raylets to release unused bundles.
gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles);
// Notify raylets to release unused bundles.
gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles);
SchedulePendingPlacementGroups();
RAY_LOG(INFO) << "Finished loading initial data.";
done();
};
RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().GetAll(callback));
SchedulePendingPlacementGroups();
}
std::string GcsPlacementGroupManager::DebugString() const {
@@ -20,6 +20,7 @@
#include "ray/common/id.h"
#include "ray/common/task/task_execution_spec.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
#include "ray/gcs/gcs_server/gcs_placement_group_scheduler.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
@@ -231,11 +232,11 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// Collect stats from gcs placement group manager in-memory data structures.
void CollectStats() const;
/// Load initial data from gcs storage to memory cache asynchronously.
/// Initialize with the gcs tables data synchronously.
/// This should be called when GCS server restarts after a failure.
///
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
/// \param gcs_init_data.
void Initialize(const GcsInitData &gcs_init_data);
std::string DebugString() const;
+138 -126
View File
@@ -41,7 +41,16 @@ GcsServer::~GcsServer() { Stop(); }
void GcsServer::Start() {
// Init backend client.
InitBackendClient();
GcsClientOptions options(config_.redis_address, config_.redis_port,
config_.redis_password, config_.is_test);
redis_gcs_client_ = std::make_shared<RedisGcsClient>(options);
auto status = redis_gcs_client_->Connect(main_service_);
RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status;
// Init redis failure detector.
gcs_redis_failure_detector_ = std::make_shared<GcsRedisFailureDetector>(
main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); });
gcs_redis_failure_detector_->Start();
// Init gcs pub sub instance.
gcs_pub_sub_ = std::make_shared<gcs::GcsPubSub>(redis_gcs_client_->GetRedisClient());
@@ -50,86 +59,55 @@ void GcsServer::Start() {
gcs_table_storage_ =
std::make_shared<gcs::RedisGcsTableStorage>(redis_gcs_client_->GetRedisClient());
// Load gcs tables data asynchronously.
auto gcs_init_data = std::make_shared<GcsInitData>(gcs_table_storage_);
gcs_init_data->AsyncLoad([this, gcs_init_data] { DoStart(*gcs_init_data); });
}
void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init gcs node_manager.
InitGcsNodeManager();
InitGcsNodeManager(gcs_init_data);
// Init gcs detector.
gcs_redis_failure_detector_ = std::make_shared<GcsRedisFailureDetector>(
main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); });
gcs_redis_failure_detector_->Start();
// Init gcs actor manager.
InitGcsActorManager();
// Init gcs job manager.
InitGcsJobManager();
// Init gcs placement group manager.
InitGcsPlacementGroupManager();
InitGcsPlacementGroupManager(gcs_init_data);
// Register rpc service.
gcs_object_manager_ = InitObjectManager();
object_info_service_.reset(
new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_));
rpc_server_.RegisterService(*object_info_service_);
// Init gcs actor manager.
InitGcsActorManager(gcs_init_data);
task_info_handler_ = InitTaskInfoHandler();
task_info_service_.reset(
new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_));
rpc_server_.RegisterService(*task_info_service_);
// Init object manager.
InitObjectManager(gcs_init_data);
InitGcsJobManager();
job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_));
rpc_server_.RegisterService(*job_info_service_);
// Init gcs worker manager.
InitGcsWorkerManager();
actor_info_service_.reset(
new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_));
rpc_server_.RegisterService(*actor_info_service_);
// Init task info handler.
InitTaskInfoHandler();
placement_group_info_service_.reset(new rpc::PlacementGroupInfoGrpcService(
main_service_, *gcs_placement_group_manager_));
rpc_server_.RegisterService(*placement_group_info_service_);
// Init stats handler.
InitStatsHandler();
node_info_service_.reset(
new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_));
rpc_server_.RegisterService(*node_info_service_);
// Install event listeners.
InstallEventListeners();
stats_handler_ = InitStatsHandler();
stats_service_.reset(new rpc::StatsGrpcService(main_service_, *stats_handler_));
rpc_server_.RegisterService(*stats_service_);
// Start RPC server when all tables have finished loading initial
// data.
rpc_server_.Run();
gcs_worker_manager_ = InitGcsWorkerManager();
worker_info_service_.reset(
new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_));
rpc_server_.RegisterService(*worker_info_service_);
auto load_completed_count = std::make_shared<int>(0);
int load_count = 3;
auto on_done = [this, load_count, load_completed_count]() {
++(*load_completed_count);
// We will reschedule the unfinished actors, so we have to load the actor data at the
// end to make sure the other table data is loaded.
if (*load_completed_count == load_count) {
auto actor_manager_load_initial_data_callback = [this]() {
// Start RPC server when all tables have finished loading initial data.
rpc_server_.Run();
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
// Only after the rpc_server_ is running can the node failure detector be run.
// Otherwise the node failure detector will mistake some living nodes as dead
// as the timer inside node failure detector is already run.
gcs_node_manager_->StartNodeFailureDetector();
is_started_ = true;
};
gcs_actor_manager_->LoadInitialData(actor_manager_load_initial_data_callback);
}
};
gcs_object_manager_->LoadInitialData(on_done);
gcs_node_manager_->LoadInitialData(on_done);
gcs_placement_group_manager_->LoadInitialData(on_done);
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
// Only after the rpc_server_ is running can the node failure
// detector be run. Otherwise the node failure detector will mistake
// some living nodes as dead as the timer inside node failure
// detector is already run.
gcs_node_manager_->StartNodeFailureDetector();
// Print debug info periodically.
PrintDebugInfo();
is_started_ = true;
}
void GcsServer::Stop() {
@@ -148,17 +126,8 @@ void GcsServer::Stop() {
}
}
void GcsServer::InitBackendClient() {
GcsClientOptions options(config_.redis_address, config_.redis_port,
config_.redis_password, config_.is_test);
redis_gcs_client_ = std::make_shared<RedisGcsClient>(options);
auto status = redis_gcs_client_->Connect(main_service_);
RAY_CHECK(status.ok()) << "Failed to init redis gcs client as " << status;
}
void GcsServer::InitGcsNodeManager() {
RAY_CHECK(redis_gcs_client_ != nullptr);
void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(redis_gcs_client_ && gcs_table_storage_ && gcs_pub_sub_);
node_manager_io_service_thread_.reset(new std::thread([this] {
/// The asio work to keep node_manager_io_service_ alive.
boost::asio::io_service::work node_manager_io_service_work_(node_manager_io_service_);
@@ -166,10 +135,24 @@ void GcsServer::InitGcsNodeManager() {
}));
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_);
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
// Register service.
node_info_service_.reset(
new rpc::NodeInfoGrpcService(main_service_, *gcs_node_manager_));
rpc_server_.RegisterService(*node_info_service_);
}
void GcsServer::InitGcsActorManager() {
RAY_CHECK(gcs_table_storage_ != nullptr && gcs_node_manager_ != nullptr);
void GcsServer::InitGcsJobManager() {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
gcs_job_manager_.reset(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
// Register service.
job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *gcs_job_manager_));
rpc_server_.RegisterService(*job_info_service_);
}
void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_);
auto scheduler = std::make_shared<GcsActorScheduler>(
main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_,
/*schedule_failure_handler=*/
@@ -203,46 +186,16 @@ void GcsServer::InitGcsActorManager() {
[this](const rpc::Address &address) {
return std::make_shared<rpc::CoreWorkerClient>(address, client_call_manager_);
});
gcs_node_manager_->AddNodeAddedListener(
[this](const std::shared_ptr<rpc::GcsNodeInfo> &) {
// Because a new node has been added, we need to try to schedule the pending
// placement groups and the pending actors.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_actor_manager_->SchedulePendingActors();
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
// All of the related placement groups and actors should be reconstructed when a
// node is removed from the GCS.
gcs_placement_group_manager_->OnNodeDead(NodeID::FromBinary(node->node_id()));
gcs_actor_manager_->OnNodeDead(NodeID::FromBinary(node->node_id()));
});
auto on_subscribe = [this](const std::string &id, const std::string &data) {
rpc::WorkerTableData worker_failure_data;
worker_failure_data.ParseFromString(data);
auto &worker_address = worker_failure_data.worker_address();
WorkerID worker_id = WorkerID::FromBinary(id);
NodeID node_id = NodeID::FromBinary(worker_address.raylet_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data.intentional_disconnect());
};
RAY_CHECK_OK(gcs_pub_sub_->SubscribeAll(WORKER_CHANNEL, on_subscribe, nullptr));
// Initialize by gcs tables data.
gcs_actor_manager_->Initialize(gcs_init_data);
// Register service.
actor_info_service_.reset(
new rpc::ActorInfoGrpcService(main_service_, *gcs_actor_manager_));
rpc_server_.RegisterService(*actor_info_service_);
}
void GcsServer::InitGcsJobManager() {
gcs_job_manager_ =
std::unique_ptr<GcsJobManager>(new GcsJobManager(gcs_table_storage_, gcs_pub_sub_));
gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr<JobID> job_id) {
gcs_actor_manager_->OnJobFinished(*job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id);
});
}
void GcsServer::InitGcsPlacementGroupManager() {
RAY_CHECK(gcs_table_storage_ != nullptr && gcs_node_manager_ != nullptr);
void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_node_manager_);
auto scheduler = std::make_shared<GcsPlacementGroupScheduler>(
main_service_, gcs_table_storage_, *gcs_node_manager_,
/*lease_client_factory=*/
@@ -255,11 +208,24 @@ void GcsServer::InitGcsPlacementGroupManager() {
gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_, scheduler, gcs_table_storage_, *gcs_node_manager_);
// Initialize by gcs tables data.
gcs_placement_group_manager_->Initialize(gcs_init_data);
// Register service.
placement_group_info_service_.reset(new rpc::PlacementGroupInfoGrpcService(
main_service_, *gcs_placement_group_manager_));
rpc_server_.RegisterService(*placement_group_info_service_);
}
std::unique_ptr<GcsObjectManager> GcsServer::InitObjectManager() {
return std::unique_ptr<GcsObjectManager>(
void GcsServer::InitObjectManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_);
gcs_object_manager_.reset(
new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_));
// Initialize by gcs tables data.
gcs_object_manager_->Initialize(gcs_init_data);
// Register service.
object_info_service_.reset(
new rpc::ObjectInfoGrpcService(main_service_, *gcs_object_manager_));
rpc_server_.RegisterService(*object_info_service_);
}
void GcsServer::StoreGcsServerAddressInRedis() {
@@ -277,19 +243,65 @@ void GcsServer::StoreGcsServerAddressInRedis() {
RAY_LOG(INFO) << "Finished setting gcs server address: " << address;
}
std::unique_ptr<rpc::TaskInfoHandler> GcsServer::InitTaskInfoHandler() {
return std::unique_ptr<rpc::DefaultTaskInfoHandler>(
void GcsServer::InitTaskInfoHandler() {
RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_);
task_info_handler_.reset(
new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_pub_sub_));
// Register service.
task_info_service_.reset(
new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_));
rpc_server_.RegisterService(*task_info_service_);
}
std::unique_ptr<rpc::StatsHandler> GcsServer::InitStatsHandler() {
return std::unique_ptr<rpc::DefaultStatsHandler>(
new rpc::DefaultStatsHandler(gcs_table_storage_));
void GcsServer::InitStatsHandler() {
RAY_CHECK(gcs_table_storage_);
stats_handler_.reset(new rpc::DefaultStatsHandler(gcs_table_storage_));
// Register service.
stats_service_.reset(new rpc::StatsGrpcService(main_service_, *stats_handler_));
rpc_server_.RegisterService(*stats_service_);
}
std::unique_ptr<GcsWorkerManager> GcsServer::InitGcsWorkerManager() {
return std::unique_ptr<GcsWorkerManager>(
void GcsServer::InitGcsWorkerManager() {
gcs_worker_manager_ = std::unique_ptr<GcsWorkerManager>(
new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_));
// Register service.
worker_info_service_.reset(
new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_));
rpc_server_.RegisterService(*worker_info_service_);
}
void GcsServer::InstallEventListeners() {
// Install node event listeners.
gcs_node_manager_->AddNodeAddedListener([this](std::shared_ptr<rpc::GcsNodeInfo> node) {
// Because a new node has been added, we need to try to schedule the pending
// placement groups and the pending actors.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_actor_manager_->SchedulePendingActors();
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
auto node_id = NodeID::FromBinary(node->node_id());
// All of the related placement groups and actors should be reconstructed when a
// node is removed from the GCS.
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
});
// Install worker event listener.
gcs_worker_manager_->AddWorkerDeadListener(
[this](std::shared_ptr<rpc::WorkerTableData> worker_failure_data) {
auto &worker_address = worker_failure_data->worker_address();
auto worker_id = WorkerID::FromBinary(worker_address.worker_id());
auto node_id = NodeID::FromBinary(worker_address.raylet_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data->intentional_disconnect());
});
// Install job event listeners.
gcs_job_manager_->AddJobFinishedListener([this](std::shared_ptr<JobID> job_id) {
gcs_actor_manager_->OnJobFinished(*job_id);
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(*job_id);
});
}
void GcsServer::CollectStats() {
+21 -22
View File
@@ -14,6 +14,7 @@
#pragma once
#include "ray/gcs/gcs_server/gcs_init_data.h"
#include "ray/gcs/gcs_server/gcs_object_manager.h"
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
#include "ray/gcs/gcs_server/gcs_table_storage.h"
@@ -71,36 +72,34 @@ class GcsServer {
bool IsStopped() const { return is_stopped_; }
protected:
/// Initialize the backend storage client.
/// The gcs server is just the proxy between the gcs client and reliable storage
/// for the time being, so we need a backend client to connect to the storage.
virtual void InitBackendClient();
void DoStart(const GcsInitData &gcs_init_data);
/// Initialize the gcs node manager.
/// The gcs node manager is responsible for managing and monitoring all nodes in the
/// cluster.
virtual void InitGcsNodeManager();
/// Initialize gcs node manager.
void InitGcsNodeManager(const GcsInitData &gcs_init_data);
/// Initialize the gcs actor manager.
virtual void InitGcsActorManager();
/// Initialize gcs job manager.
void InitGcsJobManager();
/// Initialize the gcs job manager.
virtual void InitGcsJobManager();
/// Initialize gcs actor manager.
void InitGcsActorManager(const GcsInitData &gcs_init_data);
/// Initialize the gcs placement group manager.
virtual void InitGcsPlacementGroupManager();
/// Initialize gcs placement group manager.
void InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data);
/// The object manager
virtual std::unique_ptr<GcsObjectManager> InitObjectManager();
/// Initialize gcs object manager.
void InitObjectManager(const GcsInitData &gcs_init_data);
/// The task info handler
virtual std::unique_ptr<rpc::TaskInfoHandler> InitTaskInfoHandler();
/// Initialize gcs worker manager.
void InitGcsWorkerManager();
/// The stats handler
virtual std::unique_ptr<rpc::StatsHandler> InitStatsHandler();
/// Initialize task info handler.
void InitTaskInfoHandler();
/// The worker manager
virtual std::unique_ptr<GcsWorkerManager> InitGcsWorkerManager();
/// Initialize stats handler.
void InitStatsHandler();
/// Install event listeners.
void InstallEventListeners();
private:
/// Store the address of GCS server in Redis.
@@ -40,6 +40,10 @@ void GcsWorkerManager::HandleReportWorkerFailure(
worker_failure_data->CopyFrom(request.worker_failure());
worker_failure_data->set_is_alive(false);
for (auto &listener : worker_dead_listeners_) {
listener(worker_failure_data);
}
auto on_done = [this, worker_address, worker_id, node_id, worker_failure_data, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
@@ -130,5 +134,11 @@ void GcsWorkerManager::HandleAddWorkerInfo(const rpc::AddWorkerInfoRequest &requ
}
}
void GcsWorkerManager::AddWorkerDeadListener(
std::function<void(std::shared_ptr<WorkerTableData>)> listener) {
RAY_CHECK(listener != nullptr);
worker_dead_listeners_.emplace_back(std::move(listener));
}
} // namespace gcs
} // namespace ray
@@ -45,9 +45,14 @@ class GcsWorkerManager : public rpc::WorkerInfoHandler {
rpc::AddWorkerInfoReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void AddWorkerDeadListener(
std::function<void(std::shared_ptr<WorkerTableData>)> listener);
private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::vector<std::function<void(std::shared_ptr<WorkerTableData>)>>
worker_dead_listeners_;
};
} // namespace gcs