[GCS] Move node resource info from client table to resource table (#5050)

This commit is contained in:
Kai Yang
2019-07-11 13:17:19 +08:00
committed by Hao Chen
parent 691c9733f9
commit 43b6513d19
18 changed files with 215 additions and 334 deletions
+2
View File
@@ -16,6 +16,7 @@ from ray.core.generated.gcs_pb2 import (
TablePrefix,
TablePubsub,
TaskTableData,
ResourceTableData,
)
__all__ = [
@@ -32,6 +33,7 @@ __all__ = [
"TablePrefix",
"TablePubsub",
"TaskTableData",
"ResourceTableData",
"construct_error_message",
]
+48 -29
View File
@@ -50,47 +50,35 @@ def _parse_client_table(redis_client):
for entry in gcs_entry.entries:
client = gcs_utils.ClientTableData.FromString(entry)
resources = {
client.resources_total_label[i]: client.resources_total_capacity[i]
for i in range(len(client.resources_total_label))
}
client_id = ray.utils.binary_to_hex(client.client_id)
if client.entry_type == gcs_utils.ClientTableData.INSERTION:
if client.is_insertion:
ordered_client_ids.append(client_id)
node_info[client_id] = {
"ClientID": client_id,
"EntryType": client.entry_type,
"IsInsertion": client.is_insertion,
"NodeManagerAddress": client.node_manager_address,
"NodeManagerPort": client.node_manager_port,
"ObjectManagerPort": client.object_manager_port,
"ObjectStoreSocketName": client.object_store_socket_name,
"RayletSocketName": client.raylet_socket_name,
"Resources": resources
"RayletSocketName": client.raylet_socket_name
}
# If this client is being updated, then it must
# If this client is being removed, then it must
# have previously been inserted, and
# it cannot have previously been removed.
else:
assert client_id in node_info, "Client not found!"
is_deletion = (node_info[client_id]["EntryType"] !=
gcs_utils.ClientTableData.DELETION)
assert is_deletion, "Unexpected updation of deleted client."
res_map = node_info[client_id]["Resources"]
if client.entry_type == gcs_utils.ClientTableData.RES_CREATEUPDATE:
for res in resources:
res_map[res] = resources[res]
elif client.entry_type == gcs_utils.ClientTableData.RES_DELETE:
for res in resources:
res_map.pop(res, None)
elif client.entry_type == gcs_utils.ClientTableData.DELETION:
pass # Do nothing with the resmap if client deletion
else:
raise RuntimeError("Unexpected EntryType {}".format(
client.entry_type))
node_info[client_id]["Resources"] = res_map
node_info[client_id]["EntryType"] = client.entry_type
assert node_info[client_id]["IsInsertion"], (
"Unexpected duplicate removal of client.")
node_info[client_id]["IsInsertion"] = client.is_insertion
# Fill resource info.
for client_id in ordered_client_ids:
if node_info[client_id]["IsInsertion"]:
resources = _parse_resource_table(redis_client, client_id)
else:
resources = {}
node_info[client_id]["Resources"] = resources
# NOTE: We return the list comprehension below instead of simply doing
# 'list(node_info.values())' in order to have the nodes appear in the order
# that they joined the cluster. Python dictionaries do not preserve
@@ -100,6 +88,38 @@ def _parse_client_table(redis_client):
return [node_info[client_id] for client_id in ordered_client_ids]
def _parse_resource_table(redis_client, client_id):
"""Read the resource table with given client id.
Args:
redis_client: A client to the primary Redis shard.
client_id: The client ID of the node in hex.
Returns:
A dict of resources about this node.
"""
message = redis_client.execute_command(
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("NODE_RESOURCE"), "",
ray.utils.hex_to_binary(client_id))
if message is None:
return {}
resources = {}
gcs_entry = gcs_utils.GcsEntry.FromString(message)
entries_len = len(gcs_entry.entries)
if entries_len % 2 != 0:
raise Exception("Invalid entry size for resource lookup: " +
str(entries_len))
for i in range(0, entries_len, 2):
resource_table_data = gcs_utils.ResourceTableData.FromString(
gcs_entry.entries[i + 1])
resources[decode(
gcs_entry.entries[i])] = resource_table_data.resource_capacity
return resources
class GlobalState(object):
"""A class used to interface with the Ray control state.
@@ -800,7 +820,7 @@ class GlobalState(object):
clients = self.client_table()
for client in clients:
# Only count resources from latest entries of live clients.
if client["EntryType"] != gcs_utils.ClientTableData.DELETION:
if client["IsInsertion"]:
for key, value in client["Resources"].items():
resources[key] += value
return dict(resources)
@@ -809,8 +829,7 @@ class GlobalState(object):
"""Returns a set of client IDs corresponding to clients still alive."""
return {
client["ClientID"]
for client in self.client_table()
if (client["EntryType"] != gcs_utils.ClientTableData.DELETION)
for client in self.client_table() if (client["IsInsertion"])
}
def available_resources(self):
+1 -3
View File
@@ -8,7 +8,6 @@ import time
import redis
import ray
from ray.gcs_utils import ClientTableData
logger = logging.getLogger(__name__)
@@ -176,8 +175,7 @@ class Cluster(object):
while time.time() - start_time < timeout:
clients = ray.state._parse_client_table(redis_client)
live_clients = [
client for client in clients
if client["EntryType"] == ClientTableData.INSERTION
client for client in clients if client["IsInsertion"]
]
expected = len(self.list_all_nodes())