diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ea2d5cae2..ea5c8b4f7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -373,7 +373,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Register a callback to monitor removed nodes. auto on_node_change = [this](const NodeID &node_id, const rpc::GcsNodeInfo &data) { if (data.state() == rpc::GcsNodeInfo::DEAD) { - OnNodeRemoved(data); + OnNodeRemoved(node_id); } }; RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); @@ -653,8 +653,7 @@ void CoreWorker::RunIOService() { io_service_.run(); } -void CoreWorker::OnNodeRemoved(const rpc::GcsNodeInfo &node_info) { - const auto node_id = NodeID::FromBinary(node_info.node_id()); +void CoreWorker::OnNodeRemoved(const NodeID &node_id) { RAY_LOG(INFO) << "Node failure " << node_id; const auto lost_objects = reference_counter_->ResetObjectsOnRemovedNode(node_id); // Delete the objects from the in-memory store to indicate that they are not diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 597493c38..088ba346a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1058,7 +1058,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } /// Handler if a raylet node is removed from the cluster. - void OnNodeRemoved(const rpc::GcsNodeInfo &node_info); + void OnNodeRemoved(const NodeID &node_id); /// Request the spillage of an object that we own from the primary that hosts /// the primary copy to spill. diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 08c1e3500..ea2425caa 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -558,7 +558,12 @@ void ServiceBasedNodeInfoAccessor::HandleNotification(const GcsNodeInfo &node_in // Add the notification to our cache. RAY_LOG(INFO) << "Received notification for node id = " << node_id << ", IsAlive = " << is_alive; - node_cache_[node_id] = node_info; + if (is_alive) { + node_cache_[node_id] = node_info; + } else { + node_cache_[node_id].set_state(rpc::GcsNodeInfo::DEAD); + node_cache_[node_id].set_timestamp(node_info.timestamp()); + } // If the notification is new, call registered callback. if (is_notif_new) { diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 8f9da5b20..d6dcbb5c6 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -57,13 +57,17 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ node->set_state(rpc::GcsNodeInfo::DEAD); node->set_timestamp(current_sys_time_ms()); AddDeadNodeToCache(node); + auto node_info_delta = std::make_shared(); + node_info_delta->set_node_id(node->node_id()); + node_info_delta->set_state(node->state()); + node_info_delta->set_timestamp(node->timestamp()); - auto on_done = [this, node_id, node, reply, + auto on_done = [this, node_id, node_info_delta, reply, send_reply_callback](const Status &status) { - auto on_done = [this, node_id, node, reply, + auto on_done = [this, node_id, node_info_delta, reply, send_reply_callback](const Status &status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), - node->SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_pub_sub_->Publish( + NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id; }; @@ -179,10 +183,15 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id) { node->set_state(rpc::GcsNodeInfo::DEAD); node->set_timestamp(current_sys_time_ms()); AddDeadNodeToCache(node); - auto on_done = [this, node_id, node](const Status &status) { - auto on_done = [this, node_id, node](const Status &status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), - node->SerializeAsString(), nullptr)); + auto node_info_delta = std::make_shared(); + node_info_delta->set_node_id(node->node_id()); + node_info_delta->set_state(node->state()); + node_info_delta->set_timestamp(node->timestamp()); + + auto on_done = [this, node_id, node_info_delta](const Status &status) { + auto on_done = [this, node_id, node_info_delta](const Status &status) { + RAY_CHECK_OK(gcs_pub_sub_->Publish( + NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr)); }; RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); }; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ce00ae20f..a0fc0fa14 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -255,7 +255,7 @@ ray::Status NodeManager::RegisterGcs() { NodeAdded(data); } else { RAY_CHECK(data.state() == GcsNodeInfo::DEAD); - NodeRemoved(data); + NodeRemoved(node_id); } }; @@ -680,10 +680,9 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) { })); } -void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { +void NodeManager::NodeRemoved(const NodeID &node_id) { // TODO(swang): If we receive a notification for our own death, clean up and // exit immediately. - const NodeID node_id = NodeID::FromBinary(node_info.node_id()); RAY_LOG(DEBUG) << "[NodeRemoved] Received callback from node id " << node_id; RAY_CHECK(node_id != self_node_id_) @@ -717,7 +716,7 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { // Clean up workers that were owned by processes that were on the failed // node. rpc::Address address; - address.set_raylet_id(node_info.node_id()); + address.set_raylet_id(node_id.Binary()); HandleUnexpectedWorkerFailure(address); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 4b9b98d19..43f19e05b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -182,9 +182,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler, void NodeAdded(const GcsNodeInfo &data); /// Handler for the removal of a GCS node. - /// \param node_info Data associated with the removed node. + /// \param node_id Id of the removed node. /// \return Void. - void NodeRemoved(const GcsNodeInfo &node_info); + void NodeRemoved(const NodeID &node_id); /// Handler for the addition or updation of a resource in the GCS /// \param node_id ID of the node that created or updated resources.