Fix node manager miss object info bug (#8337)

This commit is contained in:
fangfengbin
2020-05-07 20:16:42 +08:00
committed by GitHub
parent bc8b606ad7
commit 620ea94873
+30 -25
View File
@@ -211,33 +211,38 @@ ray::Status NodeManager::RegisterGcs() {
NodeRemoved(data);
}
};
// If the node resource message is received first and then the node message is received,
// ForwardTask will throw exception, because it can't get node info.
auto on_done = [this](Status status) {
RAY_CHECK_OK(status);
// Subscribe to resource changes.
const auto &resources_changed =
[this](const ClientID &id,
const gcs::ResourceChangeNotification &resource_notification) {
if (resource_notification.IsAdded()) {
ResourceSet resource_set;
for (auto &entry : resource_notification.GetData()) {
resource_set.AddOrUpdateResource(entry.first,
entry.second->resource_capacity());
}
ResourceCreateUpdated(id, resource_set);
} else {
RAY_CHECK(resource_notification.IsRemoved());
std::vector<std::string> resource_names;
for (auto &entry : resource_notification.GetData()) {
resource_names.push_back(entry.first);
}
ResourceDeleted(id, resource_names);
}
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeToResources(
/*subscribe_callback=*/resources_changed,
/*done_callback=*/nullptr));
};
// Register a callback to monitor new nodes and a callback to monitor removed nodes.
RAY_RETURN_NOT_OK(
gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, nullptr));
// Subscribe to resource changes.
const auto &resources_changed =
[this](const ClientID &id,
const gcs::ResourceChangeNotification &resource_notification) {
if (resource_notification.IsAdded()) {
ResourceSet resource_set;
for (auto &entry : resource_notification.GetData()) {
resource_set.AddOrUpdateResource(entry.first,
entry.second->resource_capacity());
}
ResourceCreateUpdated(id, resource_set);
} else {
RAY_CHECK(resource_notification.IsRemoved());
std::vector<std::string> resource_names;
for (auto &entry : resource_notification.GetData()) {
resource_names.push_back(entry.first);
}
ResourceDeleted(id, resource_names);
}
};
RAY_RETURN_NOT_OK(gcs_client_->Nodes().AsyncSubscribeToResources(
/*subscribe_callback=*/resources_changed,
/*done_callback=*/nullptr));
gcs_client_->Nodes().AsyncSubscribeToNodeChange(on_node_change, on_done));
// Subscribe to heartbeat batches from the monitor.
const auto &heartbeat_batch_added =