Rename ClientTableData to GcsNodeInfo (#5251)

This commit is contained in:
micafan
2019-07-30 11:22:47 +08:00
committed by Hao Chen
parent 3ba8680963
commit b3bcf59148
21 changed files with 331 additions and 337 deletions
+2 -2
View File
@@ -4,7 +4,7 @@ from __future__ import print_function
from ray.core.generated.gcs_pb2 import (
ActorCheckpointIdData,
ClientTableData,
GcsNodeInfo,
JobTableData,
ErrorTableData,
ErrorType,
@@ -21,7 +21,7 @@ from ray.core.generated.gcs_pb2 import (
__all__ = [
"ActorCheckpointIdData",
"ClientTableData",
"GcsNodeInfo",
"JobTableData",
"ErrorTableData",
"ErrorType",
+2 -3
View File
@@ -279,13 +279,12 @@ class Monitor(object):
all_raylet_nodes = ray.nodes()
self.raylet_id_to_ip_map = {}
for raylet_info in all_raylet_nodes:
client_id = (raylet_info.get("DBClientID")
or raylet_info["ClientID"])
node_id = (raylet_info.get("DBClientID") or raylet_info["NodeID"])
ip_address = (raylet_info.get("AuxAddress")
or raylet_info["NodeManagerAddress"]).split(":")[0]
if _append_port:
ip_address += ":" + str(raylet_info["NodeManagerPort"])
self.raylet_id_to_ip_map[client_id] = ip_address
self.raylet_id_to_ip_map[node_id] = ip_address
def _maybe_flush_gcs(self):
"""Experimental: issue a flush request to the GCS.
+37 -37
View File
@@ -43,49 +43,49 @@ def _parse_client_table(redis_client):
node_info = {}
gcs_entry = gcs_utils.GcsEntry.FromString(message)
ordered_client_ids = []
ordered_node_ids = []
# Since GCS entries are append-only, we override so that
# only the latest entries are kept.
for entry in gcs_entry.entries:
client = gcs_utils.ClientTableData.FromString(entry)
item = gcs_utils.GcsNodeInfo.FromString(entry)
client_id = ray.utils.binary_to_hex(client.client_id)
node_id = ray.utils.binary_to_hex(item.node_id)
if client.is_insertion:
ordered_client_ids.append(client_id)
node_info[client_id] = {
"ClientID": client_id,
"IsInsertion": client.is_insertion,
"NodeManagerAddress": client.node_manager_address,
"NodeManagerPort": client.node_manager_port,
"ObjectManagerPort": client.object_manager_port,
"ObjectStoreSocketName": client.object_store_socket_name,
"RayletSocketName": client.raylet_socket_name
if item.state == gcs_utils.GcsNodeInfo.GcsNodeState.Value("ALIVE"):
ordered_node_ids.append(node_id)
node_info[node_id] = {
"NodeID": node_id,
"Alive": True,
"NodeManagerAddress": item.node_manager_address,
"NodeManagerPort": item.node_manager_port,
"ObjectManagerPort": item.object_manager_port,
"ObjectStoreSocketName": item.object_store_socket_name,
"RayletSocketName": item.raylet_socket_name
}
# If this client is being removed, then it must
# If this node is being removed, then it must
# have previously been inserted, and
# it cannot have previously been removed.
else:
assert client_id in node_info, "Client not found!"
assert node_info[client_id]["IsInsertion"], (
"Unexpected duplicate removal of client.")
node_info[client_id]["IsInsertion"] = client.is_insertion
assert node_id in node_info, "node not found!"
assert node_info[node_id]["Alive"], (
"Unexpected duplicate removal of node.")
node_info[node_id]["Alive"] = False
# Fill resource info.
for client_id in ordered_client_ids:
if node_info[client_id]["IsInsertion"]:
resources = _parse_resource_table(redis_client, client_id)
for node_id in ordered_node_ids:
if node_info[node_id]["Alive"]:
resources = _parse_resource_table(redis_client, node_id)
else:
resources = {}
node_info[client_id]["Resources"] = resources
node_info[node_id]["Resources"] = resources
# NOTE: We return the list comprehension below instead of simply doing
# 'list(node_info.values())' in order to have the nodes appear in the order
# that they joined the cluster. Python dictionaries do not preserve
# insertion order. We could use an OrderedDict, but then we'd have to be
# sure to only insert a given node a single time (clients that die appear
# twice in the GCS log).
return [node_info[client_id] for client_id in ordered_client_ids]
return [node_info[node_id] for node_id in ordered_node_ids]
def _parse_resource_table(redis_client, client_id):
@@ -402,7 +402,7 @@ class GlobalState(object):
for client in client_table:
# These are equivalent and is better for application developers.
client["alive"] = client["IsInsertion"]
client["alive"] = client["Alive"]
return client_table
def _job_table(self, job_id):
@@ -690,11 +690,11 @@ class GlobalState(object):
"""
self._check_connected()
client_id_to_address = {}
for client_info in self.client_table():
client_id_to_address[client_info["ClientID"]] = "{}:{}".format(
client_info["NodeManagerAddress"],
client_info["ObjectManagerPort"])
node_id_to_address = {}
for node_info in self.client_table():
node_id_to_address[node_info["NodeID"]] = "{}:{}".format(
node_info["NodeManagerAddress"],
node_info["ObjectManagerPort"])
all_events = []
@@ -705,13 +705,13 @@ class GlobalState(object):
for event in items:
if event["event_type"] == "transfer_send":
object_id, remote_client_id, _, _ = event["extra_data"]
object_id, remote_node_id, _, _ = event["extra_data"]
elif event["event_type"] == "transfer_receive":
object_id, remote_client_id, _, _ = event["extra_data"]
object_id, remote_node_id, _, _ = event["extra_data"]
elif event["event_type"] == "receive_pull_request":
object_id, remote_client_id = event["extra_data"]
object_id, remote_node_id = event["extra_data"]
else:
assert False, "This should be unreachable."
@@ -729,9 +729,9 @@ class GlobalState(object):
"name": event["event_type"],
# The identifier for the group of rows that the event
# appears in.
"pid": client_id_to_address[key],
"pid": node_id_to_address[key],
# The identifier for the row that the event appears in.
"tid": client_id_to_address[remote_client_id],
"tid": node_id_to_address[remote_node_id],
# The start time in microseconds.
"ts": self._seconds_to_microseconds(event["start_time"]),
# The duration in microseconds.
@@ -825,7 +825,7 @@ class GlobalState(object):
clients = self.client_table()
for client in clients:
# Only count resources from latest entries of live clients.
if client["IsInsertion"]:
if client["Alive"]:
for key, value in client["Resources"].items():
resources[key] += value
return dict(resources)
@@ -833,8 +833,8 @@ class GlobalState(object):
def _live_client_ids(self):
"""Returns a set of client IDs corresponding to clients still alive."""
return {
client["ClientID"]
for client in self.client_table() if (client["IsInsertion"])
client["NodeID"]
for client in self.client_table() if (client["Alive"])
}
def available_resources(self):
+1 -3
View File
@@ -174,9 +174,7 @@ class Cluster(object):
start_time = time.time()
while time.time() - start_time < timeout:
clients = ray.state._parse_client_table(redis_client)
live_clients = [
client for client in clients if client["IsInsertion"]
]
live_clients = [client for client in clients if client["Alive"]]
expected = len(self.list_all_nodes())
if len(live_clients) == expected:
+45 -45
View File
@@ -88,7 +88,7 @@ def test_dynamic_res_updation_clientid(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
target_clientid = ray.nodes()[1]["ClientID"]
target_node_id = ray.nodes()[1]["NodeID"]
@ray.remote
def set_res(resource_name, resource_capacity, client_id):
@@ -96,15 +96,15 @@ def test_dynamic_res_updation_clientid(ray_start_cluster):
resource_name, resource_capacity, client_id=client_id)
# Create resource
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
# Update resource
new_capacity = res_capacity + 1
ray.get(set_res.remote(res_name, new_capacity, target_clientid))
ray.get(set_res.remote(res_name, new_capacity, target_node_id))
target_client = next(client for client in ray.nodes()
if client["ClientID"] == target_clientid)
resources = target_client["Resources"]
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
assert res_name in resources
assert resources[res_name] == new_capacity
@@ -122,17 +122,17 @@ def test_dynamic_res_creation_clientid(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
target_clientid = ray.nodes()[1]["ClientID"]
target_node_id = ray.nodes()[1]["NodeID"]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
ray.experimental.set_resource(
resource_name, resource_capacity, client_id=res_client_id)
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
target_client = next(client for client in ray.nodes()
if client["ClientID"] == target_clientid)
resources = target_client["Resources"]
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
assert res_name in resources
assert resources[res_name] == res_capacity
@@ -152,7 +152,7 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
target_clientids = [client["ClientID"] for client in ray.nodes()]
target_node_ids = [node["NodeID"] for node in ray.nodes()]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
@@ -160,8 +160,8 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
resource_name, resource_capacity, client_id=res_client_id)
results = []
for cid in target_clientids:
results.append(set_res.remote(res_name, res_capacity, cid))
for nid in target_node_ids:
results.append(set_res.remote(res_name, res_capacity, nid))
ray.get(results)
success = False
@@ -169,10 +169,10 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster):
while time.time() - start_time < TIMEOUT and not success:
resources_created = []
for cid in target_clientids:
target_client = next(
client for client in ray.nodes() if client["ClientID"] == cid)
resources = target_client["Resources"]
for nid in target_node_ids:
target_node = next(
node for node in ray.nodes() if node["NodeID"] == nid)
resources = target_node["Resources"]
resources_created.append(resources[res_name] == res_capacity)
success = all(resources_created)
assert success
@@ -193,7 +193,7 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
target_clientid = ray.nodes()[1]["ClientID"]
target_node_id = ray.nodes()[1]["NodeID"]
# Launch the delete task
@ray.remote
@@ -201,11 +201,11 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster):
ray.experimental.set_resource(
resource_name, 0, client_id=res_client_id)
ray.get(delete_res.remote(res_name, target_clientid))
ray.get(delete_res.remote(res_name, target_node_id))
target_client = next(client for client in ray.nodes()
if client["ClientID"] == target_clientid)
resources = target_client["Resources"]
target_node = next(
node for node in ray.nodes() if node["NodeID"] == target_node_id)
resources = target_node["Resources"]
print(ray.cluster_resources())
assert res_name not in resources
@@ -225,7 +225,7 @@ def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
clientids = [client["ClientID"] for client in ray.nodes()]
node_ids = [node["NodeID"] for node in ray.nodes()]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
@@ -233,8 +233,8 @@ def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster):
resource_name, resource_capacity, client_id=res_client_id)
# Create the resource on node1
target_clientid = clientids[1]
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
target_node_id = node_ids[1]
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
# Define a task which requires this resource
@ray.remote(resources={res_name: res_capacity})
@@ -262,7 +262,7 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
clientids = [client["ClientID"] for client in ray.nodes()]
node_ids = [node["NodeID"] for node in ray.nodes()]
@ray.remote
def delete_res(resource_name, res_client_id):
@@ -275,12 +275,12 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster):
resource_name, resource_capacity, client_id=res_client_id)
# Create the resource on node1
target_clientid = clientids[1]
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
target_node_id = node_ids[1]
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
# Delete the resource
ray.get(delete_res.remote(res_name, target_clientid))
ray.get(delete_res.remote(res_name, target_node_id))
# Define a task which requires this resource. This should not run
@ray.remote(resources={res_name: res_capacity})
@@ -315,8 +315,8 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
clientids = [client["ClientID"] for client in ray.nodes()]
target_clientid = clientids[1]
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
@@ -324,7 +324,7 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster):
resource_name, resource_capacity, client_id=res_client_id)
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
# Task to hold the resource till the driver signals to finish
@@ -348,7 +348,7 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster):
ray.get(ray.ObjectID(TASK_RUNNING_OBJECT_ID_STR))
# Update the resource capacity
ray.get(set_res.remote(res_name, updated_capacity, target_clientid))
ray.get(set_res.remote(res_name, updated_capacity, target_node_id))
# Signal task to complete
ray.worker.global_worker.put_object(ray.ObjectID(WAIT_OBJECT_ID_STR), 1)
@@ -394,8 +394,8 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
clientids = [client["ClientID"] for client in ray.nodes()]
target_clientid = clientids[1]
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
@@ -403,7 +403,7 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
resource_name, resource_capacity, client_id=res_client_id)
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
# Task to hold the resource till the driver signals to finish
@@ -427,7 +427,7 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster):
ray.get(ray.ObjectID(TASK_RUNNING_OBJECT_ID_STR))
# Decrease the resource capacity
ray.get(set_res.remote(res_name, updated_capacity, target_clientid))
ray.get(set_res.remote(res_name, updated_capacity, target_node_id))
# Signal task to complete
ray.worker.global_worker.put_object(ray.ObjectID(WAIT_OBJECT_ID_STR), 1)
@@ -471,8 +471,8 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
clientids = [client["ClientID"] for client in ray.nodes()]
target_clientid = clientids[1]
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
@@ -485,7 +485,7 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
resource_name, 0, client_id=res_client_id)
# Create the resource on node 1
ray.get(set_res.remote(res_name, res_capacity, target_clientid))
ray.get(set_res.remote(res_name, res_capacity, target_node_id))
assert ray.cluster_resources()[res_name] == res_capacity
# Task to hold the resource till the driver signals to finish
@@ -509,7 +509,7 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster):
ray.get(ray.ObjectID(TASK_RUNNING_OBJECT_ID_STR))
# Delete the resource
ray.get(delete_res.remote(res_name, target_clientid))
ray.get(delete_res.remote(res_name, target_node_id))
# Signal task to complete
ray.worker.global_worker.put_object(ray.ObjectID(WAIT_OBJECT_ID_STR), 1)
@@ -540,8 +540,8 @@ def test_dynamic_res_creation_stress(ray_start_cluster):
ray.init(redis_address=cluster.redis_address)
clientids = [client["ClientID"] for client in ray.nodes()]
target_clientid = clientids[1]
node_ids = [node["NodeID"] for node in ray.nodes()]
target_node_id = node_ids[1]
@ray.remote
def set_res(resource_name, resource_capacity, res_client_id):
@@ -554,7 +554,7 @@ def test_dynamic_res_creation_stress(ray_start_cluster):
resource_name, 0, client_id=res_client_id)
results = [
set_res.remote(str(i), res_capacity, target_clientid)
set_res.remote(str(i), res_capacity, target_node_id)
for i in range(0, NUM_RES_TO_CREATE)
]
ray.get(results)
+3 -3
View File
@@ -656,7 +656,7 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes):
cluster = ray_start_cluster_2_nodes
cluster.wait_for_nodes()
client_ids = {item["ClientID"] for item in ray.nodes()}
node_ids = {item["NodeID"] for item in ray.nodes()}
# Try to make sure that the monitor has received at least one heartbeat
# from the node.
@@ -671,12 +671,12 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes):
# Extract the client IDs from the error messages. This will need to be
# changed if the error message changes.
warning_client_ids = {
warning_node_ids = {
item["message"].split(" ")[5]
for item in relevant_errors(ray_constants.REMOVED_NODE_ERROR)
}
assert client_ids == warning_client_ids
assert node_ids == warning_node_ids
def test_raylet_crash_when_get(ray_start_regular):