[GCS] fix the fault tolerance about gcs node manager (#9380)

This commit is contained in:
ZhuSenlin
2020-07-22 10:55:51 +08:00
committed by GitHub
parent cd42450fc1
commit 382b314241
4 changed files with 93 additions and 5 deletions
@@ -1,6 +1,12 @@
import sys
import ray
import pytest
from ray.test_utils import (
generate_internal_config_map,
wait_for_condition,
wait_for_pid_to_exit,
)
@ray.remote
@@ -48,6 +54,64 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular):
assert len(unready) == 0
@pytest.mark.parametrize(
"ray_start_cluster_head",
[generate_internal_config_map(num_heartbeats_timeout=20)],
indirect=True)
def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head):
"""Checks that the node failure detector is correct when gcs server restart.
We set the cluster to timeout nodes after 2 seconds of heartbeats. We then
kill gcs server and remove the worker node and restart gcs server again to
check that the removed node will die finally.
"""
cluster = ray_start_cluster_head
worker = cluster.add_node()
cluster.wait_for_nodes()
# Make sure both head and worker node are alive.
nodes = ray.nodes()
assert len(nodes) == 2
assert nodes[0]["alive"] and nodes[1]["alive"]
to_be_removed_node = None
for node in nodes:
if node["RayletSocketName"] == worker.raylet_socket_name:
to_be_removed_node = node
assert to_be_removed_node is not None
head_node = cluster.head_node
gcs_server_process = head_node.all_processes["gcs_server"][0].process
gcs_server_pid = gcs_server_process.pid
# Kill gcs server.
cluster.head_node.kill_gcs_server()
# Wait to prevent the gcs server process becoming zombie.
gcs_server_process.wait()
wait_for_pid_to_exit(gcs_server_pid, 1000)
raylet_process = worker.all_processes["raylet"][0].process
raylet_pid = raylet_process.pid
# Remove worker node.
cluster.remove_node(worker, allow_graceful=False)
# Wait to prevent the raylet process becoming zombie.
raylet_process.wait()
wait_for_pid_to_exit(raylet_pid)
# Restart gcs server process.
cluster.head_node.start_gcs_server()
def condition():
nodes = ray.nodes()
assert len(nodes) == 2
for node in nodes:
if node["NodeID"] == to_be_removed_node["NodeID"]:
return not node["alive"]
return False
# Wait for the removed node dead.
assert wait_for_condition(condition, timeout=10)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+16 -5
View File
@@ -31,8 +31,13 @@ GcsNodeManager::NodeFailureDetector::NodeFailureDetector(
num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()),
light_heartbeat_enabled_(RayConfig::instance().light_heartbeat_enabled()),
detect_timer_(io_service),
gcs_pub_sub_(std::move(gcs_pub_sub)) {
Tick();
gcs_pub_sub_(std::move(gcs_pub_sub)) {}
void GcsNodeManager::NodeFailureDetector::Start() {
if (!is_started_) {
Tick();
is_started_ = true;
}
}
void GcsNodeManager::NodeFailureDetector::AddNode(const ray::ClientID &node_id) {
@@ -129,7 +134,7 @@ GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service,
RAY_CHECK_OK(
gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, on_done));
RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done));
}
})),
gcs_pub_sub_(gcs_pub_sub),
@@ -381,7 +386,9 @@ void GcsNodeManager::LoadInitialData(const EmptyCallback &done) {
const std::unordered_map<ClientID, GcsNodeInfo> &result) {
for (auto &item : result) {
if (item.second.state() == rpc::GcsNodeInfo::ALIVE) {
alive_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
// Call `AddNode` for this node to make sure it is tracked by the failure
// detector.
AddNode(std::make_shared<rpc::GcsNodeInfo>(item.second));
} else if (item.second.state() == rpc::GcsNodeInfo::DEAD) {
dead_nodes_.emplace(item.first, std::make_shared<rpc::GcsNodeInfo>(item.second));
}
@@ -390,7 +397,9 @@ void GcsNodeManager::LoadInitialData(const EmptyCallback &done) {
auto get_node_resource_callback =
[this, done](const std::unordered_map<ClientID, ResourceMap> &result) {
for (auto &item : result) {
cluster_resources_.emplace(item.first, item.second);
if (alive_nodes_.count(item.first)) {
cluster_resources_[item.first] = item.second;
}
}
RAY_LOG(INFO) << "Finished loading initial data.";
done();
@@ -401,5 +410,7 @@ void GcsNodeManager::LoadInitialData(const EmptyCallback &done) {
RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(get_node_callback));
}
void GcsNodeManager::StartNodeFailureDetector() { node_failure_detector_->Start(); }
} // namespace gcs
} // namespace ray
@@ -140,6 +140,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
/// \param done Callback that will be called when load is complete.
void LoadInitialData(const EmptyCallback &done);
/// Start node failure detector.
void StartNodeFailureDetector();
protected:
class NodeFailureDetector {
public:
@@ -156,6 +159,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::function<void(const ClientID &)> on_node_death_callback);
/// Start failure detector.
void Start();
/// Register node to this detector.
/// Only if the node has registered, its heartbeat data will be accepted.
///
@@ -203,6 +209,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
absl::flat_hash_map<ClientID, rpc::HeartbeatTableData> heartbeat_buffer_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// Is the detect started.
bool is_started_ = false;
};
private:
+5
View File
@@ -119,6 +119,11 @@ void GcsServer::Start() {
// Store gcs rpc server address in redis.
StoreGcsServerAddressInRedis();
// Only after the rpc_server_ is running can the node failure detector be run.
// Otherwise the node failure detector will mistake some living nodes as dead
// as the timer inside node failure detector is already run.
gcs_node_manager_->StartNodeFailureDetector();
is_started_ = true;
};
gcs_actor_manager_->LoadInitialData(actor_manager_load_initial_data_callback);