Don't include redundant entries in global_state.client_table (#2880)

This commit is contained in:
Richard Liaw
2018-09-17 12:52:49 -07:00
committed by Robert Nishihara
parent dc76e51a60
commit 899e4585bc
+17 -5
View File
@@ -508,10 +508,12 @@ class GlobalState(object):
if message is None:
return []
node_info = []
node_info = {}
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)
# Since GCS entries are append-only, we override so that
# only the latest entries are kept.
for i in range(gcs_entry.EntriesLength()):
client = (
ray.gcs_utils.ClientTableData.GetRootAsClientTableData(
@@ -522,8 +524,18 @@ class GlobalState(object):
client.ResourcesTotalCapacity(i)
for i in range(client.ResourcesTotalLabelLength())
}
node_info.append({
"ClientID": ray.utils.binary_to_hex(client.ClientId()),
client_id = ray.utils.binary_to_hex(client.ClientId())
# If this client is being removed, then it must
# have previously been inserted, and
# it cannot have previously been removed.
if not client.IsInsertion():
assert client_id in node_info, "Client removed not found!"
assert node_info[client_id]["IsInsertion"], (
"Unexpected duplicate removal of client.")
node_info[client_id] = {
"ClientID": client_id,
"IsInsertion": client.IsInsertion(),
"NodeManagerAddress": decode(client.NodeManagerAddress()),
"NodeManagerPort": client.NodeManagerPort(),
@@ -532,8 +544,8 @@ class GlobalState(object):
client.ObjectStoreSocketName()),
"RayletSocketName": decode(client.RayletSocketName()),
"Resources": resources
})
return node_info
}
return list(node_info.values())
def log_files(self):
"""Fetch and return a dictionary of log file names to outputs.