diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 156b1747e..eaf65368e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -211,33 +211,38 @@ ray::Status NodeManager::RegisterGcs() { NodeRemoved(data); } }; + + // If the node resource message is received first and then the node message is received, + // ForwardTask will throw exception, because it can't get node info. + auto on_done = [this](Status status) { + RAY_CHECK_OK(status); + // Subscribe to resource changes. + const auto &resources_changed = + [this](const ClientID &id, + const gcs::ResourceChangeNotification &resource_notification) { + if (resource_notification.IsAdded()) { + ResourceSet resource_set; + for (auto &entry : resource_notification.GetData()) { + resource_set.AddOrUpdateResource(entry.first, + entry.second->resource_capacity()); + } + ResourceCreateUpdated(id, resource_set); + } else { + RAY_CHECK(resource_notification.IsRemoved()); + std::vector resource_names; + for (auto &entry : resource_notification.GetData()) { + resource_names.push_back(entry.first); + } + ResourceDeleted(id, resource_names); + } + }; + RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToResources( + /*subscribe_callback=*/resources_changed, + /*done_callback=*/nullptr)); + }; // Register a callback to monitor new nodes and a callback to monitor removed nodes. RAY_RETURN_NOT_OK( - gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr)); - - // Subscribe to resource changes. - const auto &resources_changed = - [this](const ClientID &id, - const gcs::ResourceChangeNotification &resource_notification) { - if (resource_notification.IsAdded()) { - ResourceSet resource_set; - for (auto &entry : resource_notification.GetData()) { - resource_set.AddOrUpdateResource(entry.first, - entry.second->resource_capacity()); - } - ResourceCreateUpdated(id, resource_set); - } else { - RAY_CHECK(resource_notification.IsRemoved()); - std::vector resource_names; - for (auto &entry : resource_notification.GetData()) { - resource_names.push_back(entry.first); - } - ResourceDeleted(id, resource_names); - } - }; - RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeToResources( - /*subscribe_callback=*/resources_changed, - /*done_callback=*/nullptr)); + gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, on_done)); // Subscribe to heartbeat batches from the monitor. const auto &heartbeat_batch_added =