mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 18:45:03 +08:00
[Dashboard] displays resources row (#6516)
This commit is contained in:
committed by
Philipp Moritz
parent
840d9c126f
commit
166560e428
@@ -43,6 +43,21 @@ def to_unix_time(dt):
|
||||
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
|
||||
|
||||
|
||||
def round_resource_value(quantity):
|
||||
if quantity.is_integer():
|
||||
return int(quantity)
|
||||
else:
|
||||
return round(quantity, 2)
|
||||
|
||||
|
||||
def format_resource(resource_name, quantity):
|
||||
if resource_name == "object_store_memory" or resource_name == "memory":
|
||||
# Convert to 100MiB chunks and then to GiB
|
||||
quantity = quantity * (50 * 1024 * 1024) / (1024 * 1024 * 1024)
|
||||
return f"{round_resource_value(quantity)} GiB"
|
||||
return f"{round_resource_value(quantity)}"
|
||||
|
||||
|
||||
class Dashboard(object):
|
||||
"""A dashboard process for monitoring Ray nodes.
|
||||
|
||||
@@ -144,6 +159,17 @@ class Dashboard(object):
|
||||
|
||||
async def raylet_info(req) -> aiohttp.web.Response:
|
||||
D = self.raylet_stats.get_raylet_stats()
|
||||
for address, data in D.items():
|
||||
available_resources = data["availableResources"]
|
||||
total_resources = data["totalResources"]
|
||||
extra_info = ""
|
||||
for resource_name in sorted(available_resources.keys()):
|
||||
total = total_resources[resource_name]
|
||||
occupied = total - available_resources[resource_name]
|
||||
total = format_resource(resource_name, total)
|
||||
occupied = format_resource(resource_name, occupied)
|
||||
extra_info += f"{resource_name}: {occupied} / {total}, "
|
||||
data["extraInfo"] = extra_info[:-2]
|
||||
return await json_response(result=D)
|
||||
|
||||
async def logs(req) -> aiohttp.web.Response:
|
||||
|
||||
@@ -82,6 +82,8 @@ message ViewData {
|
||||
message NodeStatsReply {
|
||||
repeated WorkerStats workers_stats = 1;
|
||||
repeated ViewData view_data = 2;
|
||||
map<string, double> available_resources = 3;
|
||||
map<string, double> total_resources = 4;
|
||||
}
|
||||
|
||||
// Service for inter-node-manager communication.
|
||||
|
||||
@@ -2984,6 +2984,23 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request,
|
||||
worker_stats->set_pid(driver->Pid());
|
||||
worker_stats->set_is_driver(true);
|
||||
}
|
||||
// Record available resources of this node.
|
||||
const auto &available_resources =
|
||||
cluster_resource_map_.at(client_id_).GetAvailableResources().GetResourceMap();
|
||||
// Record total resources of this node.
|
||||
const auto &total_resources =
|
||||
cluster_resource_map_.at(client_id_).GetTotalResources().GetResourceMap();
|
||||
auto available_resources_map = reply->mutable_available_resources();
|
||||
auto total_resources_map = reply->mutable_total_resources();
|
||||
for (const auto &pair : total_resources) {
|
||||
(*total_resources_map)[pair.first] = pair.second;
|
||||
auto it = available_resources.find(pair.first);
|
||||
if (it != available_resources.end()) {
|
||||
(*available_resources_map)[pair.first] = it->second;
|
||||
} else {
|
||||
(*available_resources_map)[pair.first] = 0.0;
|
||||
}
|
||||
}
|
||||
// Ensure we never report an empty set of metrics.
|
||||
if (!recorded_metrics_) {
|
||||
RecordMetrics();
|
||||
|
||||
Reference in New Issue
Block a user