From 382b314241ef0c475b0acf1d9e1bcebb7742848c Mon Sep 17 00:00:00 2001 From: ZhuSenlin Date: Wed, 22 Jul 2020 10:55:51 +0800 Subject: [PATCH] [GCS] fix the fault tolerance about gcs node manager (#9380) --- python/ray/tests/test_gcs_fault_tolerance.py | 64 ++++++++++++++++++++ src/ray/gcs/gcs_server/gcs_node_manager.cc | 21 +++++-- src/ray/gcs/gcs_server/gcs_node_manager.h | 8 +++ src/ray/gcs/gcs_server/gcs_server.cc | 5 ++ 4 files changed, 93 insertions(+), 5 deletions(-) diff --git a/python/ray/tests/test_gcs_fault_tolerance.py b/python/ray/tests/test_gcs_fault_tolerance.py index 2ebf21a86..ab6ed0509 100644 --- a/python/ray/tests/test_gcs_fault_tolerance.py +++ b/python/ray/tests/test_gcs_fault_tolerance.py @@ -1,6 +1,12 @@ import sys import ray +import pytest +from ray.test_utils import ( + generate_internal_config_map, + wait_for_condition, + wait_for_pid_to_exit, +) @ray.remote @@ -48,6 +54,64 @@ def test_gcs_server_restart_during_actor_creation(ray_start_regular): assert len(unready) == 0 +@pytest.mark.parametrize( + "ray_start_cluster_head", + [generate_internal_config_map(num_heartbeats_timeout=20)], + indirect=True) +def test_node_failure_detector_when_gcs_server_restart(ray_start_cluster_head): + """Checks that the node failure detector is correct when gcs server restart. + + We set the cluster to timeout nodes after 2 seconds of heartbeats. We then + kill gcs server and remove the worker node and restart gcs server again to + check that the removed node will die finally. + """ + cluster = ray_start_cluster_head + worker = cluster.add_node() + cluster.wait_for_nodes() + + # Make sure both head and worker node are alive. + nodes = ray.nodes() + assert len(nodes) == 2 + assert nodes[0]["alive"] and nodes[1]["alive"] + + to_be_removed_node = None + for node in nodes: + if node["RayletSocketName"] == worker.raylet_socket_name: + to_be_removed_node = node + assert to_be_removed_node is not None + + head_node = cluster.head_node + gcs_server_process = head_node.all_processes["gcs_server"][0].process + gcs_server_pid = gcs_server_process.pid + # Kill gcs server. + cluster.head_node.kill_gcs_server() + # Wait to prevent the gcs server process becoming zombie. + gcs_server_process.wait() + wait_for_pid_to_exit(gcs_server_pid, 1000) + + raylet_process = worker.all_processes["raylet"][0].process + raylet_pid = raylet_process.pid + # Remove worker node. + cluster.remove_node(worker, allow_graceful=False) + # Wait to prevent the raylet process becoming zombie. + raylet_process.wait() + wait_for_pid_to_exit(raylet_pid) + + # Restart gcs server process. + cluster.head_node.start_gcs_server() + + def condition(): + nodes = ray.nodes() + assert len(nodes) == 2 + for node in nodes: + if node["NodeID"] == to_be_removed_node["NodeID"]: + return not node["alive"] + return False + + # Wait for the removed node dead. + assert wait_for_condition(condition, timeout=10) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 645e31661..37c19e6f5 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -31,8 +31,13 @@ GcsNodeManager::NodeFailureDetector::NodeFailureDetector( num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), light_heartbeat_enabled_(RayConfig::instance().light_heartbeat_enabled()), detect_timer_(io_service), - gcs_pub_sub_(std::move(gcs_pub_sub)) { - Tick(); + gcs_pub_sub_(std::move(gcs_pub_sub)) {} + +void GcsNodeManager::NodeFailureDetector::Start() { + if (!is_started_) { + Tick(); + is_started_ = true; + } } void GcsNodeManager::NodeFailureDetector::AddNode(const ray::ClientID &node_id) { @@ -129,7 +134,7 @@ GcsNodeManager::GcsNodeManager(boost::asio::io_service &io_service, RAY_CHECK_OK( gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); }; - RAY_CHECK_OK(gcs_table_storage_->NodeTable().Delete(node_id, on_done)); + RAY_CHECK_OK(gcs_table_storage_->NodeTable().Put(node_id, *node, on_done)); } })), gcs_pub_sub_(gcs_pub_sub), @@ -381,7 +386,9 @@ void GcsNodeManager::LoadInitialData(const EmptyCallback &done) { const std::unordered_map &result) { for (auto &item : result) { if (item.second.state() == rpc::GcsNodeInfo::ALIVE) { - alive_nodes_.emplace(item.first, std::make_shared(item.second)); + // Call `AddNode` for this node to make sure it is tracked by the failure + // detector. + AddNode(std::make_shared(item.second)); } else if (item.second.state() == rpc::GcsNodeInfo::DEAD) { dead_nodes_.emplace(item.first, std::make_shared(item.second)); } @@ -390,7 +397,9 @@ void GcsNodeManager::LoadInitialData(const EmptyCallback &done) { auto get_node_resource_callback = [this, done](const std::unordered_map &result) { for (auto &item : result) { - cluster_resources_.emplace(item.first, item.second); + if (alive_nodes_.count(item.first)) { + cluster_resources_[item.first] = item.second; + } } RAY_LOG(INFO) << "Finished loading initial data."; done(); @@ -401,5 +410,7 @@ void GcsNodeManager::LoadInitialData(const EmptyCallback &done) { RAY_CHECK_OK(gcs_table_storage_->NodeTable().GetAll(get_node_callback)); } +void GcsNodeManager::StartNodeFailureDetector() { node_failure_detector_->Start(); } + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index f9b4124d0..133ff664d 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -140,6 +140,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { /// \param done Callback that will be called when load is complete. void LoadInitialData(const EmptyCallback &done); + /// Start node failure detector. + void StartNodeFailureDetector(); + protected: class NodeFailureDetector { public: @@ -156,6 +159,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::shared_ptr gcs_pub_sub, std::function on_node_death_callback); + /// Start failure detector. + void Start(); + /// Register node to this detector. /// Only if the node has registered, its heartbeat data will be accepted. /// @@ -203,6 +209,8 @@ class GcsNodeManager : public rpc::NodeInfoHandler { absl::flat_hash_map heartbeat_buffer_; /// A publisher for publishing gcs messages. std::shared_ptr gcs_pub_sub_; + /// Is the detect started. + bool is_started_ = false; }; private: diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 7fc705091..04eb0b4c0 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -119,6 +119,11 @@ void GcsServer::Start() { // Store gcs rpc server address in redis. StoreGcsServerAddressInRedis(); + + // Only after the rpc_server_ is running can the node failure detector be run. + // Otherwise the node failure detector will mistake some living nodes as dead + // as the timer inside node failure detector is already run. + gcs_node_manager_->StartNodeFailureDetector(); is_started_ = true; }; gcs_actor_manager_->LoadInitialData(actor_manager_load_initial_data_callback);