From b3bcf5914810fa3e556a84575c7521228d6aab03 Mon Sep 17 00:00:00 2001 From: micafan <550435771@qq.com> Date: Tue, 30 Jul 2019 11:22:47 +0800 Subject: [PATCH] Rename ClientTableData to GcsNodeInfo (#5251) --- .../java/org/ray/runtime/gcs/GcsClient.java | 32 ++-- python/ray/gcs_utils.py | 4 +- python/ray/monitor.py | 5 +- python/ray/state.py | 74 ++++----- python/ray/tests/cluster_utils.py | 4 +- python/ray/tests/test_dynres.py | 90 +++++------ python/ray/tests/test_failure.py | 6 +- src/ray/gcs/redis_gcs_client_test.cc | 67 +++++---- src/ray/gcs/tables.cc | 140 +++++++++--------- src/ray/gcs/tables.h | 58 ++++---- src/ray/object_manager/object_directory.cc | 14 +- .../test/object_manager_stress_test.cc | 25 ++-- .../test/object_manager_test.cc | 31 ++-- src/ray/protobuf/gcs.proto | 32 ++-- src/ray/raylet/monitor.cc | 7 +- src/ray/raylet/monitor.h | 2 +- src/ray/raylet/node_manager.cc | 23 ++- src/ray/raylet/node_manager.h | 8 +- .../raylet/object_manager_integration_test.cc | 22 +-- src/ray/raylet/raylet.cc | 22 +-- src/ray/raylet/raylet.h | 2 +- 21 files changed, 331 insertions(+), 337 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java index 6ce8ad20e..0465833b3 100644 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java @@ -16,7 +16,7 @@ import org.ray.api.id.UniqueId; import org.ray.api.runtimecontext.NodeInfo; import org.ray.runtime.generated.Gcs; import org.ray.runtime.generated.Gcs.ActorCheckpointIdData; -import org.ray.runtime.generated.Gcs.ClientTableData; +import org.ray.runtime.generated.Gcs.GcsNodeInfo; import org.ray.runtime.generated.Gcs.TablePrefix; import org.ray.runtime.util.IdUtil; import org.slf4j.Logger; @@ -60,40 +60,40 @@ public class GcsClient { return new ArrayList<>(); } - // This map is used for deduplication of client entries. - Map clients = new HashMap<>(); + // This map is used for deduplication of node entries. + Map nodes = new HashMap<>(); for (byte[] result : results) { Preconditions.checkNotNull(result); - ClientTableData data = null; + GcsNodeInfo data = null; try { - data = ClientTableData.parseFrom(result); + data = GcsNodeInfo.parseFrom(result); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("Received invalid protobuf data from GCS."); } - final UniqueId clientId = UniqueId - .fromByteBuffer(data.getClientId().asReadOnlyByteBuffer()); + final UniqueId nodeId = UniqueId + .fromByteBuffer(data.getNodeId().asReadOnlyByteBuffer()); - if (data.getIsInsertion()) { + if (data.getState() == GcsNodeInfo.GcsNodeState.ALIVE) { //Code path of node insertion. NodeInfo nodeInfo = new NodeInfo( - clientId, data.getNodeManagerAddress(), true, new HashMap<>()); - clients.put(clientId, nodeInfo); + nodeId, data.getNodeManagerAddress(), true, new HashMap<>()); + nodes.put(nodeId, nodeInfo); } else { // Code path of node deletion. - NodeInfo nodeInfo = new NodeInfo(clientId, clients.get(clientId).nodeAddress, + NodeInfo nodeInfo = new NodeInfo(nodeId, nodes.get(nodeId).nodeAddress, false, new HashMap<>()); - clients.put(clientId, nodeInfo); + nodes.put(nodeId, nodeInfo); } } // Fill resources. - for (Map.Entry client : clients.entrySet()) { - if (client.getValue().isAlive) { - client.getValue().resources.putAll(getResourcesForClient(client.getKey())); + for (Map.Entry node : nodes.entrySet()) { + if (node.getValue().isAlive) { + node.getValue().resources.putAll(getResourcesForClient(node.getKey())); } } - return new ArrayList<>(clients.values()); + return new ArrayList<>(nodes.values()); } private Map getResourcesForClient(UniqueId clientId) { diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index b000a1d60..2aed699c8 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -4,7 +4,7 @@ from __future__ import print_function from ray.core.generated.gcs_pb2 import ( ActorCheckpointIdData, - ClientTableData, + GcsNodeInfo, JobTableData, ErrorTableData, ErrorType, @@ -21,7 +21,7 @@ from ray.core.generated.gcs_pb2 import ( __all__ = [ "ActorCheckpointIdData", - "ClientTableData", + "GcsNodeInfo", "JobTableData", "ErrorTableData", "ErrorType", diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 2abd512c3..c4531d8b3 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -279,13 +279,12 @@ class Monitor(object): all_raylet_nodes = ray.nodes() self.raylet_id_to_ip_map = {} for raylet_info in all_raylet_nodes: - client_id = (raylet_info.get("DBClientID") - or raylet_info["ClientID"]) + node_id = (raylet_info.get("DBClientID") or raylet_info["NodeID"]) ip_address = (raylet_info.get("AuxAddress") or raylet_info["NodeManagerAddress"]).split(":")[0] if _append_port: ip_address += ":" + str(raylet_info["NodeManagerPort"]) - self.raylet_id_to_ip_map[client_id] = ip_address + self.raylet_id_to_ip_map[node_id] = ip_address def _maybe_flush_gcs(self): """Experimental: issue a flush request to the GCS. diff --git a/python/ray/state.py b/python/ray/state.py index d42f564a7..9ba192eb8 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -43,49 +43,49 @@ def _parse_client_table(redis_client): node_info = {} gcs_entry = gcs_utils.GcsEntry.FromString(message) - ordered_client_ids = [] + ordered_node_ids = [] # Since GCS entries are append-only, we override so that # only the latest entries are kept. for entry in gcs_entry.entries: - client = gcs_utils.ClientTableData.FromString(entry) + item = gcs_utils.GcsNodeInfo.FromString(entry) - client_id = ray.utils.binary_to_hex(client.client_id) + node_id = ray.utils.binary_to_hex(item.node_id) - if client.is_insertion: - ordered_client_ids.append(client_id) - node_info[client_id] = { - "ClientID": client_id, - "IsInsertion": client.is_insertion, - "NodeManagerAddress": client.node_manager_address, - "NodeManagerPort": client.node_manager_port, - "ObjectManagerPort": client.object_manager_port, - "ObjectStoreSocketName": client.object_store_socket_name, - "RayletSocketName": client.raylet_socket_name + if item.state == gcs_utils.GcsNodeInfo.GcsNodeState.Value("ALIVE"): + ordered_node_ids.append(node_id) + node_info[node_id] = { + "NodeID": node_id, + "Alive": True, + "NodeManagerAddress": item.node_manager_address, + "NodeManagerPort": item.node_manager_port, + "ObjectManagerPort": item.object_manager_port, + "ObjectStoreSocketName": item.object_store_socket_name, + "RayletSocketName": item.raylet_socket_name } - # If this client is being removed, then it must + # If this node is being removed, then it must # have previously been inserted, and # it cannot have previously been removed. else: - assert client_id in node_info, "Client not found!" - assert node_info[client_id]["IsInsertion"], ( - "Unexpected duplicate removal of client.") - node_info[client_id]["IsInsertion"] = client.is_insertion + assert node_id in node_info, "node not found!" + assert node_info[node_id]["Alive"], ( + "Unexpected duplicate removal of node.") + node_info[node_id]["Alive"] = False # Fill resource info. - for client_id in ordered_client_ids: - if node_info[client_id]["IsInsertion"]: - resources = _parse_resource_table(redis_client, client_id) + for node_id in ordered_node_ids: + if node_info[node_id]["Alive"]: + resources = _parse_resource_table(redis_client, node_id) else: resources = {} - node_info[client_id]["Resources"] = resources + node_info[node_id]["Resources"] = resources # NOTE: We return the list comprehension below instead of simply doing # 'list(node_info.values())' in order to have the nodes appear in the order # that they joined the cluster. Python dictionaries do not preserve # insertion order. We could use an OrderedDict, but then we'd have to be # sure to only insert a given node a single time (clients that die appear # twice in the GCS log). - return [node_info[client_id] for client_id in ordered_client_ids] + return [node_info[node_id] for node_id in ordered_node_ids] def _parse_resource_table(redis_client, client_id): @@ -402,7 +402,7 @@ class GlobalState(object): for client in client_table: # These are equivalent and is better for application developers. - client["alive"] = client["IsInsertion"] + client["alive"] = client["Alive"] return client_table def _job_table(self, job_id): @@ -690,11 +690,11 @@ class GlobalState(object): """ self._check_connected() - client_id_to_address = {} - for client_info in self.client_table(): - client_id_to_address[client_info["ClientID"]] = "{}:{}".format( - client_info["NodeManagerAddress"], - client_info["ObjectManagerPort"]) + node_id_to_address = {} + for node_info in self.client_table(): + node_id_to_address[node_info["NodeID"]] = "{}:{}".format( + node_info["NodeManagerAddress"], + node_info["ObjectManagerPort"]) all_events = [] @@ -705,13 +705,13 @@ class GlobalState(object): for event in items: if event["event_type"] == "transfer_send": - object_id, remote_client_id, _, _ = event["extra_data"] + object_id, remote_node_id, _, _ = event["extra_data"] elif event["event_type"] == "transfer_receive": - object_id, remote_client_id, _, _ = event["extra_data"] + object_id, remote_node_id, _, _ = event["extra_data"] elif event["event_type"] == "receive_pull_request": - object_id, remote_client_id = event["extra_data"] + object_id, remote_node_id = event["extra_data"] else: assert False, "This should be unreachable." @@ -729,9 +729,9 @@ class GlobalState(object): "name": event["event_type"], # The identifier for the group of rows that the event # appears in. - "pid": client_id_to_address[key], + "pid": node_id_to_address[key], # The identifier for the row that the event appears in. - "tid": client_id_to_address[remote_client_id], + "tid": node_id_to_address[remote_node_id], # The start time in microseconds. "ts": self._seconds_to_microseconds(event["start_time"]), # The duration in microseconds. @@ -825,7 +825,7 @@ class GlobalState(object): clients = self.client_table() for client in clients: # Only count resources from latest entries of live clients. - if client["IsInsertion"]: + if client["Alive"]: for key, value in client["Resources"].items(): resources[key] += value return dict(resources) @@ -833,8 +833,8 @@ class GlobalState(object): def _live_client_ids(self): """Returns a set of client IDs corresponding to clients still alive.""" return { - client["ClientID"] - for client in self.client_table() if (client["IsInsertion"]) + client["NodeID"] + for client in self.client_table() if (client["Alive"]) } def available_resources(self): diff --git a/python/ray/tests/cluster_utils.py b/python/ray/tests/cluster_utils.py index c0871be30..36acb5195 100644 --- a/python/ray/tests/cluster_utils.py +++ b/python/ray/tests/cluster_utils.py @@ -174,9 +174,7 @@ class Cluster(object): start_time = time.time() while time.time() - start_time < timeout: clients = ray.state._parse_client_table(redis_client) - live_clients = [ - client for client in clients if client["IsInsertion"] - ] + live_clients = [client for client in clients if client["Alive"]] expected = len(self.list_all_nodes()) if len(live_clients) == expected: diff --git a/python/ray/tests/test_dynres.py b/python/ray/tests/test_dynres.py index 34b104132..0429e17b3 100644 --- a/python/ray/tests/test_dynres.py +++ b/python/ray/tests/test_dynres.py @@ -88,7 +88,7 @@ def test_dynamic_res_updation_clientid(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - target_clientid = ray.nodes()[1]["ClientID"] + target_node_id = ray.nodes()[1]["NodeID"] @ray.remote def set_res(resource_name, resource_capacity, client_id): @@ -96,15 +96,15 @@ def test_dynamic_res_updation_clientid(ray_start_cluster): resource_name, resource_capacity, client_id=client_id) # Create resource - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) # Update resource new_capacity = res_capacity + 1 - ray.get(set_res.remote(res_name, new_capacity, target_clientid)) + ray.get(set_res.remote(res_name, new_capacity, target_node_id)) - target_client = next(client for client in ray.nodes() - if client["ClientID"] == target_clientid) - resources = target_client["Resources"] + target_node = next( + node for node in ray.nodes() if node["NodeID"] == target_node_id) + resources = target_node["Resources"] assert res_name in resources assert resources[res_name] == new_capacity @@ -122,17 +122,17 @@ def test_dynamic_res_creation_clientid(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - target_clientid = ray.nodes()[1]["ClientID"] + target_node_id = ray.nodes()[1]["NodeID"] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): ray.experimental.set_resource( resource_name, resource_capacity, client_id=res_client_id) - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) - target_client = next(client for client in ray.nodes() - if client["ClientID"] == target_clientid) - resources = target_client["Resources"] + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) + target_node = next( + node for node in ray.nodes() if node["NodeID"] == target_node_id) + resources = target_node["Resources"] assert res_name in resources assert resources[res_name] == res_capacity @@ -152,7 +152,7 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - target_clientids = [client["ClientID"] for client in ray.nodes()] + target_node_ids = [node["NodeID"] for node in ray.nodes()] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): @@ -160,8 +160,8 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster): resource_name, resource_capacity, client_id=res_client_id) results = [] - for cid in target_clientids: - results.append(set_res.remote(res_name, res_capacity, cid)) + for nid in target_node_ids: + results.append(set_res.remote(res_name, res_capacity, nid)) ray.get(results) success = False @@ -169,10 +169,10 @@ def test_dynamic_res_creation_clientid_multiple(ray_start_cluster): while time.time() - start_time < TIMEOUT and not success: resources_created = [] - for cid in target_clientids: - target_client = next( - client for client in ray.nodes() if client["ClientID"] == cid) - resources = target_client["Resources"] + for nid in target_node_ids: + target_node = next( + node for node in ray.nodes() if node["NodeID"] == nid) + resources = target_node["Resources"] resources_created.append(resources[res_name] == res_capacity) success = all(resources_created) assert success @@ -193,7 +193,7 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - target_clientid = ray.nodes()[1]["ClientID"] + target_node_id = ray.nodes()[1]["NodeID"] # Launch the delete task @ray.remote @@ -201,11 +201,11 @@ def test_dynamic_res_deletion_clientid(ray_start_cluster): ray.experimental.set_resource( resource_name, 0, client_id=res_client_id) - ray.get(delete_res.remote(res_name, target_clientid)) + ray.get(delete_res.remote(res_name, target_node_id)) - target_client = next(client for client in ray.nodes() - if client["ClientID"] == target_clientid) - resources = target_client["Resources"] + target_node = next( + node for node in ray.nodes() if node["NodeID"] == target_node_id) + resources = target_node["Resources"] print(ray.cluster_resources()) assert res_name not in resources @@ -225,7 +225,7 @@ def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - clientids = [client["ClientID"] for client in ray.nodes()] + node_ids = [node["NodeID"] for node in ray.nodes()] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): @@ -233,8 +233,8 @@ def test_dynamic_res_creation_scheduler_consistency(ray_start_cluster): resource_name, resource_capacity, client_id=res_client_id) # Create the resource on node1 - target_clientid = clientids[1] - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) + target_node_id = node_ids[1] + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) # Define a task which requires this resource @ray.remote(resources={res_name: res_capacity}) @@ -262,7 +262,7 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - clientids = [client["ClientID"] for client in ray.nodes()] + node_ids = [node["NodeID"] for node in ray.nodes()] @ray.remote def delete_res(resource_name, res_client_id): @@ -275,12 +275,12 @@ def test_dynamic_res_deletion_scheduler_consistency(ray_start_cluster): resource_name, resource_capacity, client_id=res_client_id) # Create the resource on node1 - target_clientid = clientids[1] - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) + target_node_id = node_ids[1] + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) assert ray.cluster_resources()[res_name] == res_capacity # Delete the resource - ray.get(delete_res.remote(res_name, target_clientid)) + ray.get(delete_res.remote(res_name, target_node_id)) # Define a task which requires this resource. This should not run @ray.remote(resources={res_name: res_capacity}) @@ -315,8 +315,8 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - clientids = [client["ClientID"] for client in ray.nodes()] - target_clientid = clientids[1] + node_ids = [node["NodeID"] for node in ray.nodes()] + target_node_id = node_ids[1] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): @@ -324,7 +324,7 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster): resource_name, resource_capacity, client_id=res_client_id) # Create the resource on node 1 - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) assert ray.cluster_resources()[res_name] == res_capacity # Task to hold the resource till the driver signals to finish @@ -348,7 +348,7 @@ def test_dynamic_res_concurrent_res_increment(ray_start_cluster): ray.get(ray.ObjectID(TASK_RUNNING_OBJECT_ID_STR)) # Update the resource capacity - ray.get(set_res.remote(res_name, updated_capacity, target_clientid)) + ray.get(set_res.remote(res_name, updated_capacity, target_node_id)) # Signal task to complete ray.worker.global_worker.put_object(ray.ObjectID(WAIT_OBJECT_ID_STR), 1) @@ -394,8 +394,8 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - clientids = [client["ClientID"] for client in ray.nodes()] - target_clientid = clientids[1] + node_ids = [node["NodeID"] for node in ray.nodes()] + target_node_id = node_ids[1] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): @@ -403,7 +403,7 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster): resource_name, resource_capacity, client_id=res_client_id) # Create the resource on node 1 - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) assert ray.cluster_resources()[res_name] == res_capacity # Task to hold the resource till the driver signals to finish @@ -427,7 +427,7 @@ def test_dynamic_res_concurrent_res_decrement(ray_start_cluster): ray.get(ray.ObjectID(TASK_RUNNING_OBJECT_ID_STR)) # Decrease the resource capacity - ray.get(set_res.remote(res_name, updated_capacity, target_clientid)) + ray.get(set_res.remote(res_name, updated_capacity, target_node_id)) # Signal task to complete ray.worker.global_worker.put_object(ray.ObjectID(WAIT_OBJECT_ID_STR), 1) @@ -471,8 +471,8 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - clientids = [client["ClientID"] for client in ray.nodes()] - target_clientid = clientids[1] + node_ids = [node["NodeID"] for node in ray.nodes()] + target_node_id = node_ids[1] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): @@ -485,7 +485,7 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster): resource_name, 0, client_id=res_client_id) # Create the resource on node 1 - ray.get(set_res.remote(res_name, res_capacity, target_clientid)) + ray.get(set_res.remote(res_name, res_capacity, target_node_id)) assert ray.cluster_resources()[res_name] == res_capacity # Task to hold the resource till the driver signals to finish @@ -509,7 +509,7 @@ def test_dynamic_res_concurrent_res_delete(ray_start_cluster): ray.get(ray.ObjectID(TASK_RUNNING_OBJECT_ID_STR)) # Delete the resource - ray.get(delete_res.remote(res_name, target_clientid)) + ray.get(delete_res.remote(res_name, target_node_id)) # Signal task to complete ray.worker.global_worker.put_object(ray.ObjectID(WAIT_OBJECT_ID_STR), 1) @@ -540,8 +540,8 @@ def test_dynamic_res_creation_stress(ray_start_cluster): ray.init(redis_address=cluster.redis_address) - clientids = [client["ClientID"] for client in ray.nodes()] - target_clientid = clientids[1] + node_ids = [node["NodeID"] for node in ray.nodes()] + target_node_id = node_ids[1] @ray.remote def set_res(resource_name, resource_capacity, res_client_id): @@ -554,7 +554,7 @@ def test_dynamic_res_creation_stress(ray_start_cluster): resource_name, 0, client_id=res_client_id) results = [ - set_res.remote(str(i), res_capacity, target_clientid) + set_res.remote(str(i), res_capacity, target_node_id) for i in range(0, NUM_RES_TO_CREATE) ] ray.get(results) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index d2a89bde5..395297de9 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -656,7 +656,7 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes): cluster = ray_start_cluster_2_nodes cluster.wait_for_nodes() - client_ids = {item["ClientID"] for item in ray.nodes()} + node_ids = {item["NodeID"] for item in ray.nodes()} # Try to make sure that the monitor has received at least one heartbeat # from the node. @@ -671,12 +671,12 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes): # Extract the client IDs from the error messages. This will need to be # changed if the error message changes. - warning_client_ids = { + warning_node_ids = { item["message"].split(" ")[5] for item in relevant_errors(ray_constants.REMOVED_NODE_ERROR) } - assert client_ids == warning_client_ids + assert node_ids == warning_node_ids def test_raylet_crash_when_get(ray_start_regular): diff --git a/src/ray/gcs/redis_gcs_client_test.cc b/src/ray/gcs/redis_gcs_client_test.cc index 1ba998329..10b498c1d 100644 --- a/src/ray/gcs/redis_gcs_client_test.cc +++ b/src/ray/gcs/redis_gcs_client_test.cc @@ -1151,17 +1151,16 @@ TEST_F(TestGcsWithAsio, TestSetSubscribeCancel) { } void ClientTableNotification(gcs::RedisGcsClient *client, const ClientID &client_id, - const ClientTableData &data, bool is_insertion) { + const GcsNodeInfo &data, bool is_alive) { ClientID added_id = client->client_table().GetLocalClientId(); ASSERT_EQ(client_id, added_id); - ASSERT_EQ(ClientID::FromBinary(data.client_id()), added_id); - ASSERT_EQ(ClientID::FromBinary(data.client_id()), added_id); - ASSERT_EQ(data.is_insertion(), is_insertion); + ASSERT_EQ(ClientID::FromBinary(data.node_id()), added_id); + ASSERT_EQ(data.state() == GcsNodeInfo::ALIVE, is_alive); - ClientTableData cached_client; + GcsNodeInfo cached_client; client->client_table().GetClient(added_id, cached_client); - ASSERT_EQ(ClientID::FromBinary(cached_client.client_id()), added_id); - ASSERT_EQ(cached_client.is_insertion(), is_insertion); + ASSERT_EQ(ClientID::FromBinary(cached_client.node_id()), added_id); + ASSERT_EQ(cached_client.state() == GcsNodeInfo::ALIVE, is_alive); } void TestClientTableConnect(const JobID &job_id, @@ -1169,18 +1168,18 @@ void TestClientTableConnect(const JobID &job_id, // Register callbacks for when a client gets added and removed. The latter // event will stop the event loop. client->client_table().RegisterClientAddedCallback( - [](gcs::RedisGcsClient *client, const ClientID &id, const ClientTableData &data) { + [](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { ClientTableNotification(client, id, data, true); test->Stop(); }); // Connect and disconnect to client table. We should receive notifications // for the addition and removal of our own entry. - ClientTableData local_client_info = client->client_table().GetLocalClient(); - local_client_info.set_node_manager_address("127.0.0.1"); - local_client_info.set_node_manager_port(0); - local_client_info.set_object_manager_port(0); - RAY_CHECK_OK(client->client_table().Connect(local_client_info)); + GcsNodeInfo local_node_info = client->client_table().GetLocalClient(); + local_node_info.set_node_manager_address("127.0.0.1"); + local_node_info.set_node_manager_port(0); + local_node_info.set_object_manager_port(0); + RAY_CHECK_OK(client->client_table().Connect(local_node_info)); test->Start(); } @@ -1194,24 +1193,24 @@ void TestClientTableDisconnect(const JobID &job_id, // Register callbacks for when a client gets added and removed. The latter // event will stop the event loop. client->client_table().RegisterClientAddedCallback( - [](gcs::RedisGcsClient *client, const ClientID &id, const ClientTableData &data) { + [](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { ClientTableNotification(client, id, data, /*is_insertion=*/true); // Disconnect from the client table. We should receive a notification // for the removal of our own entry. RAY_CHECK_OK(client->client_table().Disconnect()); }); client->client_table().RegisterClientRemovedCallback( - [](gcs::RedisGcsClient *client, const ClientID &id, const ClientTableData &data) { + [](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { ClientTableNotification(client, id, data, /*is_insertion=*/false); test->Stop(); }); // Connect to the client table. We should receive notification for the // addition of our own entry. - ClientTableData local_client_info = client->client_table().GetLocalClient(); - local_client_info.set_node_manager_address("127.0.0.1"); - local_client_info.set_node_manager_port(0); - local_client_info.set_object_manager_port(0); - RAY_CHECK_OK(client->client_table().Connect(local_client_info)); + GcsNodeInfo local_node_info = client->client_table().GetLocalClient(); + local_node_info.set_node_manager_address("127.0.0.1"); + local_node_info.set_node_manager_port(0); + local_node_info.set_object_manager_port(0); + RAY_CHECK_OK(client->client_table().Connect(local_node_info)); test->Start(); } @@ -1225,21 +1224,21 @@ void TestClientTableImmediateDisconnect(const JobID &job_id, // Register callbacks for when a client gets added and removed. The latter // event will stop the event loop. client->client_table().RegisterClientAddedCallback( - [](gcs::RedisGcsClient *client, const ClientID &id, const ClientTableData &data) { + [](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { ClientTableNotification(client, id, data, true); }); client->client_table().RegisterClientRemovedCallback( - [](gcs::RedisGcsClient *client, const ClientID &id, const ClientTableData &data) { + [](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { ClientTableNotification(client, id, data, false); test->Stop(); }); // Connect to then immediately disconnect from the client table. We should // receive notifications for the addition and removal of our own entry. - ClientTableData local_client_info = client->client_table().GetLocalClient(); - local_client_info.set_node_manager_address("127.0.0.1"); - local_client_info.set_node_manager_port(0); - local_client_info.set_object_manager_port(0); - RAY_CHECK_OK(client->client_table().Connect(local_client_info)); + GcsNodeInfo local_node_info = client->client_table().GetLocalClient(); + local_node_info.set_node_manager_address("127.0.0.1"); + local_node_info.set_node_manager_port(0); + local_node_info.set_object_manager_port(0); + RAY_CHECK_OK(client->client_table().Connect(local_node_info)); RAY_CHECK_OK(client->client_table().Disconnect()); test->Start(); } @@ -1251,12 +1250,12 @@ TEST_F(TestGcsWithAsio, TestClientTableImmediateDisconnect) { void TestClientTableMarkDisconnected(const JobID &job_id, std::shared_ptr client) { - ClientTableData local_client_info = client->client_table().GetLocalClient(); - local_client_info.set_node_manager_address("127.0.0.1"); - local_client_info.set_node_manager_port(0); - local_client_info.set_object_manager_port(0); + GcsNodeInfo local_node_info = client->client_table().GetLocalClient(); + local_node_info.set_node_manager_address("127.0.0.1"); + local_node_info.set_node_manager_port(0); + local_node_info.set_object_manager_port(0); // Connect to the client table to start receiving notifications. - RAY_CHECK_OK(client->client_table().Connect(local_client_info)); + RAY_CHECK_OK(client->client_table().Connect(local_node_info)); // Mark a different client as dead. ClientID dead_client_id = ClientID::FromRandom(); RAY_CHECK_OK(client->client_table().MarkDisconnected(dead_client_id)); @@ -1264,8 +1263,8 @@ void TestClientTableMarkDisconnected(const JobID &job_id, // marked as dead. client->client_table().RegisterClientRemovedCallback( [dead_client_id](gcs::RedisGcsClient *client, const UniqueID &id, - const ClientTableData &data) { - ASSERT_EQ(ClientID::FromBinary(data.client_id()), dead_client_id); + const GcsNodeInfo &data) { + ASSERT_EQ(ClientID::FromBinary(data.node_id()), dead_client_id); test->Stop(); }); test->Start(); diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index b02ebb196..02d447e6a 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -494,8 +494,8 @@ Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timest void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callback) { client_added_callback_ = callback; // Call the callback for any added clients that are cached. - for (const auto &entry : client_cache_) { - if (!entry.first.IsNil() && (entry.second.is_insertion())) { + for (const auto &entry : node_cache_) { + if (!entry.first.IsNil() && (entry.second.state() == GcsNodeInfo::ALIVE)) { client_added_callback_(client_, entry.first, entry.second); } } @@ -504,114 +504,111 @@ void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callbac void ClientTable::RegisterClientRemovedCallback(const ClientTableCallback &callback) { client_removed_callback_ = callback; // Call the callback for any removed clients that are cached. - for (const auto &entry : client_cache_) { - if (!entry.first.IsNil() && !entry.second.is_insertion()) { + for (const auto &entry : node_cache_) { + if (!entry.first.IsNil() && (entry.second.state() == GcsNodeInfo::DEAD)) { client_removed_callback_(client_, entry.first, entry.second); } } } void ClientTable::HandleNotification(RedisGcsClient *client, - const ClientTableData &data) { - ClientID client_id = ClientID::FromBinary(data.client_id()); + const GcsNodeInfo &node_info) { + ClientID node_id = ClientID::FromBinary(node_info.node_id()); + bool is_alive = (node_info.state() == GcsNodeInfo::ALIVE); // It's possible to get duplicate notifications from the client table, so // check whether this notification is new. - auto entry = client_cache_.find(client_id); + auto entry = node_cache_.find(node_id); bool is_notif_new; - if (entry == client_cache_.end()) { + if (entry == node_cache_.end()) { // If the entry is not in the cache, then the notification is new. is_notif_new = true; } else { // If the entry is in the cache, then the notification is new if the client // was alive and is now dead or resources have been updated. - bool was_not_deleted = entry->second.is_insertion(); - bool is_deleted = !data.is_insertion(); - is_notif_new = was_not_deleted && is_deleted; + bool was_alive = (entry->second.state() == GcsNodeInfo::ALIVE); + is_notif_new = was_alive && !is_alive; // Once a client with a given ID has been removed, it should never be added // again. If the entry was in the cache and the client was deleted, check // that this new notification is not an insertion. - if (!entry->second.is_insertion()) { - RAY_CHECK(!data.is_insertion()) - << "Notification for addition of a client that was already removed:" - << client_id; + if (!was_alive) { + RAY_CHECK(!is_alive) + << "Notification for addition of a client that was already removed:" << node_id; } } // Add the notification to our cache. Notifications are idempotent. RAY_LOG(DEBUG) << "[ClientTableNotification] ClientTable Insertion/Deletion " "notification for client id " - << client_id << ". IsInsertion: " << data.is_insertion() + << node_id << ". IsAlive: " << is_alive << ". Setting the client cache to data."; - client_cache_[client_id] = data; + node_cache_[node_id] = node_info; // If the notification is new, call any registered callbacks. - ClientTableData &cache_data = client_cache_[client_id]; + GcsNodeInfo &cache_data = node_cache_[node_id]; if (is_notif_new) { - if (data.is_insertion()) { + if (is_alive) { if (client_added_callback_ != nullptr) { - client_added_callback_(client, client_id, cache_data); + client_added_callback_(client, node_id, cache_data); } - RAY_CHECK(removed_clients_.find(client_id) == removed_clients_.end()); + RAY_CHECK(removed_nodes_.find(node_id) == removed_nodes_.end()); } else { - // NOTE(swang): The client should be added to this data structure before + // NOTE(swang): The node should be added to this data structure before // the callback gets called, in case the callback depends on the data // structure getting updated. - removed_clients_.insert(client_id); + removed_nodes_.insert(node_id); if (client_removed_callback_ != nullptr) { - client_removed_callback_(client, client_id, cache_data); + client_removed_callback_(client, node_id, cache_data); } } } } -void ClientTable::HandleConnected(RedisGcsClient *client, const ClientTableData &data) { - auto connected_client_id = ClientID::FromBinary(data.client_id()); - RAY_CHECK(client_id_ == connected_client_id) - << connected_client_id << " " << client_id_; +void ClientTable::HandleConnected(RedisGcsClient *client, const GcsNodeInfo &node_info) { + auto connected_node_id = ClientID::FromBinary(node_info.node_id()); + RAY_CHECK(node_id_ == connected_node_id) << connected_node_id << " " << node_id_; } -const ClientID &ClientTable::GetLocalClientId() const { return client_id_; } +const ClientID &ClientTable::GetLocalClientId() const { return node_id_; } -const ClientTableData &ClientTable::GetLocalClient() const { return local_client_; } +const GcsNodeInfo &ClientTable::GetLocalClient() const { return local_node_info_; } -bool ClientTable::IsRemoved(const ClientID &client_id) const { - return removed_clients_.count(client_id) == 1; +bool ClientTable::IsRemoved(const ClientID &node_id) const { + return removed_nodes_.count(node_id) == 1; } -Status ClientTable::Connect(const ClientTableData &local_client) { +Status ClientTable::Connect(const GcsNodeInfo &local_node_info) { RAY_CHECK(!disconnected_) << "Tried to reconnect a disconnected client."; - RAY_CHECK(local_client.client_id() == local_client_.client_id()); - local_client_ = local_client; + RAY_CHECK(local_node_info.node_id() == local_node_info_.node_id()); + local_node_info_ = local_node_info; // Construct the data to add to the client table. - auto data = std::make_shared(local_client_); - data->set_is_insertion(true); + auto data = std::make_shared(local_node_info_); + data->set_state(GcsNodeInfo::ALIVE); // Callback to handle our own successful connection once we've added // ourselves. auto add_callback = [this](RedisGcsClient *client, const UniqueID &log_key, - const ClientTableData &data) { + const GcsNodeInfo &data) { RAY_CHECK(log_key == client_log_key_); HandleConnected(client, data); // Callback for a notification from the client table. - auto notification_callback = [this]( - RedisGcsClient *client, const UniqueID &log_key, - const std::vector ¬ifications) { + auto notification_callback = [this](RedisGcsClient *client, const UniqueID &log_key, + const std::vector ¬ifications) { RAY_CHECK(log_key == client_log_key_); - std::unordered_map connected_nodes; - std::unordered_map disconnected_nodes; + std::unordered_map connected_nodes; + std::unordered_map disconnected_nodes; for (auto ¬ification : notifications) { // This is temporary fix for Issue 4140 to avoid connect to dead nodes. // TODO(yuhguo): remove this temporary fix after GCS entry is removable. - if (notification.is_insertion()) { - connected_nodes.emplace(notification.client_id(), notification); + if (notification.state() == GcsNodeInfo::ALIVE) { + connected_nodes.emplace(notification.node_id(), notification); } else { - auto iter = connected_nodes.find(notification.client_id()); + auto iter = connected_nodes.find(notification.node_id()); if (iter != connected_nodes.end()) { connected_nodes.erase(iter); } - disconnected_nodes.emplace(notification.client_id(), notification); + disconnected_nodes.emplace(notification.node_id(), notification); } } for (const auto &pair : connected_nodes) { @@ -624,52 +621,51 @@ Status ClientTable::Connect(const ClientTableData &local_client) { // Callback to request notifications from the client table once we've // successfully subscribed. auto subscription_callback = [this](RedisGcsClient *c) { - RAY_CHECK_OK(RequestNotifications(JobID::Nil(), client_log_key_, client_id_)); + RAY_CHECK_OK(RequestNotifications(JobID::Nil(), client_log_key_, node_id_)); }; // Subscribe to the client table. - RAY_CHECK_OK(Subscribe(JobID::Nil(), client_id_, notification_callback, - subscription_callback)); + RAY_CHECK_OK( + Subscribe(JobID::Nil(), node_id_, notification_callback, subscription_callback)); }; return Append(JobID::Nil(), client_log_key_, data, add_callback); } Status ClientTable::Disconnect(const DisconnectCallback &callback) { - auto data = std::make_shared(local_client_); - data->set_is_insertion(false); + auto node_info = std::make_shared(local_node_info_); + node_info->set_state(GcsNodeInfo::DEAD); auto add_callback = [this, callback](RedisGcsClient *client, const ClientID &id, - const ClientTableData &data) { + const GcsNodeInfo &data) { HandleConnected(client, data); RAY_CHECK_OK(CancelNotifications(JobID::Nil(), client_log_key_, id)); if (callback != nullptr) { callback(); } }; - RAY_RETURN_NOT_OK(Append(JobID::Nil(), client_log_key_, data, add_callback)); + RAY_RETURN_NOT_OK(Append(JobID::Nil(), client_log_key_, node_info, add_callback)); // We successfully added the deletion entry. Mark ourselves as disconnected. disconnected_ = true; return Status::OK(); } -ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { - auto data = std::make_shared(); - data->set_client_id(dead_client_id.Binary()); - data->set_is_insertion(false); - return Append(JobID::Nil(), client_log_key_, data, nullptr); +ray::Status ClientTable::MarkDisconnected(const ClientID &dead_node_id) { + auto node_info = std::make_shared(); + node_info->set_node_id(dead_node_id.Binary()); + node_info->set_state(GcsNodeInfo::DEAD); + return Append(JobID::Nil(), client_log_key_, node_info, nullptr); } -void ClientTable::GetClient(const ClientID &client_id, - ClientTableData &client_info) const { - RAY_CHECK(!client_id.IsNil()); - auto entry = client_cache_.find(client_id); - if (entry != client_cache_.end()) { - client_info = entry->second; +void ClientTable::GetClient(const ClientID &node_id, GcsNodeInfo &node_info) const { + RAY_CHECK(!node_id.IsNil()); + auto entry = node_cache_.find(node_id); + if (entry != node_cache_.end()) { + node_info = entry->second; } else { - client_info.set_client_id(ClientID::Nil().Binary()); + node_info.set_node_id(ClientID::Nil().Binary()); } } -const std::unordered_map &ClientTable::GetAllClients() const { - return client_cache_; +const std::unordered_map &ClientTable::GetAllClients() const { + return node_cache_; } Status ClientTable::Lookup(const Callback &lookup) { @@ -679,9 +675,9 @@ Status ClientTable::Lookup(const Callback &lookup) { std::string ClientTable::DebugString() const { std::stringstream result; - result << Log::DebugString(); - result << ", cache size: " << client_cache_.size() - << ", num removed: " << removed_clients_.size(); + result << Log::DebugString(); + result << ", cache size: " << node_cache_.size() + << ", num removed: " << removed_nodes_.size(); return result.str(); } @@ -728,7 +724,7 @@ template class Table; template class Table; template class Table; template class Log; -template class Log; +template class Log; template class Log; template class Log; template class Table; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 98241eb34..60d7a6dc6 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -23,10 +23,10 @@ namespace gcs { using rpc::ActorCheckpointData; using rpc::ActorCheckpointIdData; using rpc::ActorTableData; -using rpc::ClientTableData; using rpc::ErrorTableData; using rpc::GcsChangeMode; using rpc::GcsEntry; +using rpc::GcsNodeInfo; using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; @@ -820,34 +820,34 @@ class ProfileTable : private Log { /// it should append an entry to the log indicating that it is dead. A client /// that is marked as dead should never again be marked as alive; if it needs /// to reconnect, it must connect with a different ClientID. -class ClientTable : public Log { +class ClientTable : public Log { public: using ClientTableCallback = std::function; + RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data)>; using DisconnectCallback = std::function; ClientTable(const std::vector> &contexts, - RedisGcsClient *client, const ClientID &client_id) + RedisGcsClient *client, const ClientID &node_id) : Log(contexts, client), // We set the client log's key equal to nil so that all instances of // ClientTable have the same key. client_log_key_(), disconnected_(false), - client_id_(client_id), - local_client_() { + node_id_(node_id), + local_node_info_() { pubsub_channel_ = TablePubsub::CLIENT_PUBSUB; prefix_ = TablePrefix::CLIENT; - // Set the local client's ID. - local_client_.set_client_id(client_id.Binary()); + // Set the local node's ID. + local_node_info_.set_node_id(node_id.Binary()); }; /// Connect as a client to the GCS. This registers us in the client table /// and begins subscription to client table notifications. /// - /// \param Information about the connecting client. This must have the - /// same client_id as the one set in the client table. + /// \param local_node_info Information about the connecting client. This must have the + /// same id as the one set in the client table. /// \return Status - ray::Status Connect(const ClientTableData &local_client); + ray::Status Connect(const GcsNodeInfo &local_node_info); /// Disconnect the client from the GCS. The client ID assigned during /// registration should never be reused after disconnecting. @@ -858,9 +858,9 @@ class ClientTable : public Log { /// Mark a different client as disconnected. The client ID should never be /// reused for a new client. /// - /// \param dead_client_id The ID of the client to mark as dead. + /// \param dead_node_id The ID of the client to mark as dead. /// \return Status - ray::Status MarkDisconnected(const ClientID &dead_client_id); + ray::Status MarkDisconnected(const ClientID &dead_node_id); /// Register a callback to call when a new client is added. /// @@ -876,11 +876,11 @@ class ClientTable : public Log { /// information for clients that we've heard a notification for. /// /// \param client The client to get information about. - /// \param A reference to the client information. If we have information + /// \param node_info A reference to the client information. If we have information /// about the client in the cache, then the reference will be modified to /// contain that information. Else, the reference will be updated to contain /// a nil client ID. - void GetClient(const ClientID &client, ClientTableData &client_info) const; + void GetClient(const ClientID &client, GcsNodeInfo &node_info) const; /// Get the local client's ID. /// @@ -890,18 +890,18 @@ class ClientTable : public Log { /// Get the local client's information. /// /// \return The local client's information. - const ClientTableData &GetLocalClient() const; + const GcsNodeInfo &GetLocalClient() const; /// Check whether the given client is removed. /// - /// \param client_id The ID of the client to check. + /// \param node_id The ID of the client to check. /// \return Whether the client with ID client_id is removed. - bool IsRemoved(const ClientID &client_id) const; + bool IsRemoved(const ClientID &node_id) const; /// Get the information of all clients. /// /// \return The client ID to client information map. - const std::unordered_map &GetAllClients() const; + const std::unordered_map &GetAllClients() const; /// Lookup the client data in the client table. /// @@ -922,23 +922,23 @@ class ClientTable : public Log { private: /// Handle a client table notification. - void HandleNotification(RedisGcsClient *client, const ClientTableData ¬ifications); + void HandleNotification(RedisGcsClient *client, const GcsNodeInfo &node_info); /// Handle this client's successful connection to the GCS. - void HandleConnected(RedisGcsClient *client, const ClientTableData &client_data); + void HandleConnected(RedisGcsClient *client, const GcsNodeInfo &node_info); /// Whether this client has called Disconnect(). bool disconnected_; - /// This client's ID. - const ClientID client_id_; - /// Information about this client. - ClientTableData local_client_; + /// This node's ID. + const ClientID node_id_; + /// Information about this node. + GcsNodeInfo local_node_info_; /// The callback to call when a new client is added. ClientTableCallback client_added_callback_; /// The callback to call when a client is removed. ClientTableCallback client_removed_callback_; - /// A cache for information about all clients. - std::unordered_map client_cache_; - /// The set of removed clients. - std::unordered_set removed_clients_; + /// A cache for information about all nodes. + std::unordered_map node_cache_; + /// The set of removed nodes. + std::unordered_set removed_nodes_; }; } // namespace gcs diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index d4175bdb0..f7e468391 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -8,8 +8,8 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, namespace { -using ray::rpc::ClientTableData; using ray::rpc::GcsChangeMode; +using ray::rpc::GcsNodeInfo; using ray::rpc::ObjectTableData; /// Process a notification of the object table entries and store the result in @@ -106,14 +106,14 @@ ray::Status ObjectDirectory::ReportObjectRemoved( void ObjectDirectory::LookupRemoteConnectionInfo( RemoteConnectionInfo &connection_info) const { - ClientTableData client_data; - gcs_client_->client_table().GetClient(connection_info.client_id, client_data); - ClientID result_client_id = ClientID::FromBinary(client_data.client_id()); + GcsNodeInfo node_info; + gcs_client_->client_table().GetClient(connection_info.client_id, node_info); + ClientID result_client_id = ClientID::FromBinary(node_info.node_id()); if (!result_client_id.IsNil()) { RAY_CHECK(result_client_id == connection_info.client_id); - if (client_data.is_insertion()) { - connection_info.ip = client_data.node_manager_address(); - connection_info.port = static_cast(client_data.object_manager_port()); + if (node_info.state() == GcsNodeInfo::ALIVE) { + connection_info.ip = node_info.node_manager_address(); + connection_info.port = static_cast(node_info.object_manager_port()); } } } diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index fa217ab7b..b3dc1632e 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -11,7 +11,7 @@ namespace ray { -using rpc::ClientTableData; +using rpc::GcsNodeInfo; std::string store_executable; @@ -45,12 +45,12 @@ class MockServer { private: ray::Status RegisterGcs(boost::asio::io_service &io_service) { auto object_manager_port = config_.object_manager_port; - ClientTableData client_info = gcs_client_->client_table().GetLocalClient(); - client_info.set_node_manager_address("127.0.0.1"); - client_info.set_node_manager_port(object_manager_port); - client_info.set_object_manager_port(object_manager_port); + GcsNodeInfo node_info = gcs_client_->client_table().GetLocalClient(); + node_info.set_node_manager_address("127.0.0.1"); + node_info.set_node_manager_port(object_manager_port); + node_info.set_object_manager_port(object_manager_port); - ray::Status status = gcs_client_->client_table().Connect(client_info); + ray::Status status = gcs_client_->client_table().Connect(node_info); object_manager_.RegisterGcs(); return status; } @@ -215,9 +215,8 @@ class StressTestObjectManager : public TestObjectManagerBase { client_id_1 = gcs_client_1->client_table().GetLocalClientId(); client_id_2 = gcs_client_2->client_table().GetLocalClientId(); gcs_client_1->client_table().RegisterClientAddedCallback( - [this](gcs::RedisGcsClient *client, const ClientID &id, - const ClientTableData &data) { - ClientID parsed_id = ClientID::FromBinary(data.client_id()); + [this](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { + ClientID parsed_id = ClientID::FromBinary(data.node_id()); if (parsed_id == client_id_1 || parsed_id == client_id_2) { num_connected_clients += 1; } @@ -412,14 +411,14 @@ class StressTestObjectManager : public TestObjectManagerBase { RAY_LOG(DEBUG) << "\n" << "All connected clients:" << "\n"; - ClientTableData data; + GcsNodeInfo data; gcs_client_1->client_table().GetClient(client_id_1, data); - RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data.client_id()) << "\n" + RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data.node_id()) << "\n" << "ClientIp=" << data.node_manager_address() << "\n" << "ClientPort=" << data.node_manager_port(); - ClientTableData data2; + GcsNodeInfo data2; gcs_client_1->client_table().GetClient(client_id_2, data2); - RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data2.client_id()) << "\n" + RAY_LOG(DEBUG) << "ClientID=" << ClientID::FromBinary(data2.node_id()) << "\n" << "ClientIp=" << data2.node_manager_address() << "\n" << "ClientPort=" << data2.node_manager_port(); } diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 8d8941e9f..78e641367 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -14,7 +14,7 @@ int64_t wait_timeout_ms; namespace ray { -using rpc::ClientTableData; +using rpc::GcsNodeInfo; static inline void flushall_redis(void) { redisContext *context = redisConnect("127.0.0.1", 6379); @@ -39,12 +39,12 @@ class MockServer { private: ray::Status RegisterGcs(boost::asio::io_service &io_service) { auto object_manager_port = config_.object_manager_port; - ClientTableData client_info = gcs_client_->client_table().GetLocalClient(); - client_info.set_node_manager_address("127.0.0.1"); - client_info.set_node_manager_port(object_manager_port); - client_info.set_object_manager_port(object_manager_port); + GcsNodeInfo node_info = gcs_client_->client_table().GetLocalClient(); + node_info.set_node_manager_address("127.0.0.1"); + node_info.set_node_manager_port(object_manager_port); + node_info.set_object_manager_port(object_manager_port); - ray::Status status = gcs_client_->client_table().Connect(client_info); + ray::Status status = gcs_client_->client_table().Connect(node_info); object_manager_.RegisterGcs(); return status; } @@ -196,9 +196,8 @@ class TestObjectManager : public TestObjectManagerBase { client_id_1 = gcs_client_1->client_table().GetLocalClientId(); client_id_2 = gcs_client_2->client_table().GetLocalClientId(); gcs_client_1->client_table().RegisterClientAddedCallback( - [this](gcs::RedisGcsClient *client, const ClientID &id, - const ClientTableData &data) { - ClientID parsed_id = ClientID::FromBinary(data.client_id()); + [this](gcs::RedisGcsClient *client, const ClientID &id, const GcsNodeInfo &data) { + ClientID parsed_id = ClientID::FromBinary(data.node_id()); if (parsed_id == client_id_1 || parsed_id == client_id_2) { num_connected_clients += 1; } @@ -434,19 +433,19 @@ class TestObjectManager : public TestObjectManagerBase { RAY_LOG(DEBUG) << "\n" << "Server client ids:" << "\n"; - ClientTableData data; + GcsNodeInfo data; gcs_client_1->client_table().GetClient(client_id_1, data); - RAY_LOG(DEBUG) << (ClientID::FromBinary(data.client_id()).IsNil()); - RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::FromBinary(data.client_id()); + RAY_LOG(DEBUG) << (ClientID::FromBinary(data.node_id()).IsNil()); + RAY_LOG(DEBUG) << "Server 1 ClientID=" << ClientID::FromBinary(data.node_id()); RAY_LOG(DEBUG) << "Server 1 ClientIp=" << data.node_manager_address(); RAY_LOG(DEBUG) << "Server 1 ClientPort=" << data.node_manager_port(); - ASSERT_EQ(client_id_1, ClientID::FromBinary(data.client_id())); - ClientTableData data2; + ASSERT_EQ(client_id_1, ClientID::FromBinary(data.node_id())); + GcsNodeInfo data2; gcs_client_1->client_table().GetClient(client_id_2, data2); - RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::FromBinary(data2.client_id()); + RAY_LOG(DEBUG) << "Server 2 ClientID=" << ClientID::FromBinary(data2.node_id()); RAY_LOG(DEBUG) << "Server 2 ClientIp=" << data2.node_manager_address(); RAY_LOG(DEBUG) << "Server 2 ClientPort=" << data2.node_manager_port(); - ASSERT_EQ(client_id_2, ClientID::FromBinary(data2.client_id())); + ASSERT_EQ(client_id_2, ClientID::FromBinary(data2.node_id())); } }; diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 05d97750f..ab59c1b0d 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -151,28 +151,32 @@ message ResourceTableData { double resource_capacity = 1; } -message ClientTableData { - // The client ID of the client that the message is about. - bytes client_id = 1; - // The IP address of the client's node manager. +message GcsNodeInfo { + // State of a node. + enum GcsNodeState { + // Node is alive. + ALIVE = 0; + // Node is dead. + DEAD = 1; + } + + // The ID of node. + bytes node_id = 1; + // The IP address of the node manager. string node_manager_address = 2; - // The IPC socket name of the client's raylet. + // The IPC socket name of raylet. string raylet_socket_name = 3; - // The IPC socket name of the client's plasma store. + // The IPC socket name of the node's plasma store. string object_store_socket_name = 4; - // The port at which the client's node manager is listening for TCP + // The port at which the node manager is listening for TCP // connections from other node managers. int32 node_manager_port = 5; - // The port at which the client's object manager is listening for TCP + // The port at which the object manager is listening for TCP // connections from other object managers. int32 object_manager_port = 6; - // True if the message is about the addition of a client and false if it is - // about the deletion of a client. - bool is_insertion = 7; - // TODO(hchen): Define the following resources in map format. - repeated string resources_total_label = 8; - repeated double resources_total_capacity = 9; + // Current state of this node. + GcsNodeState state = 7; } message HeartbeatTableData { diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index 708dfc039..2113c1bb1 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -49,10 +49,11 @@ void Monitor::Tick() { RAY_LOG(WARNING) << "Client timed out: " << client_id; auto lookup_callback = [this, client_id]( gcs::RedisGcsClient *client, const ClientID &id, - const std::vector &all_data) { + const std::vector &all_node) { bool marked = false; - for (const auto &data : all_data) { - if (client_id.Binary() == data.client_id() && !data.is_insertion()) { + for (const auto &node : all_node) { + if (client_id.Binary() == node.node_id() && + node.state() == GcsNodeInfo::DEAD) { // The node has been marked dead by itself. marked = true; } diff --git a/src/ray/raylet/monitor.h b/src/ray/raylet/monitor.h index 8c5a8d150..b6ea5058c 100644 --- a/src/ray/raylet/monitor.h +++ b/src/ray/raylet/monitor.h @@ -11,7 +11,7 @@ namespace ray { namespace raylet { -using rpc::ClientTableData; +using rpc::GcsNodeInfo; using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a1ac878c2..93dd2ceec 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -176,14 +176,13 @@ ray::Status NodeManager::RegisterGcs() { // Register a callback on the client table for new clients. auto node_manager_client_added = [this](gcs::RedisGcsClient *client, const UniqueID &id, - const ClientTableData &data) { - ClientAdded(data); - }; + const GcsNodeInfo &data) { ClientAdded(data); }; gcs_client_->client_table().RegisterClientAddedCallback(node_manager_client_added); // Register a callback on the client table for removed clients. - auto node_manager_client_removed = - [this](gcs::RedisGcsClient *client, const UniqueID &id, - const ClientTableData &data) { ClientRemoved(data); }; + auto node_manager_client_removed = [this](gcs::RedisGcsClient *client, + const UniqueID &id, const GcsNodeInfo &data) { + ClientRemoved(data); + }; gcs_client_->client_table().RegisterClientRemovedCallback(node_manager_client_removed); // Subscribe to resource changes. @@ -381,8 +380,8 @@ void NodeManager::GetObjectManagerProfileInfo() { } } -void NodeManager::ClientAdded(const ClientTableData &client_data) { - const ClientID client_id = ClientID::FromBinary(client_data.client_id()); +void NodeManager::ClientAdded(const GcsNodeInfo &node_info) { + const ClientID client_id = ClientID::FromBinary(node_info.node_id()); RAY_LOG(DEBUG) << "[ClientAdded] Received callback from client id " << client_id; if (client_id == gcs_client_->client_table().GetLocalClientId()) { @@ -401,8 +400,8 @@ void NodeManager::ClientAdded(const ClientTableData &client_data) { // Initialize a rpc client to the new node manager. std::unique_ptr client( - new rpc::NodeManagerClient(client_data.node_manager_address(), - client_data.node_manager_port(), client_call_manager_)); + new rpc::NodeManagerClient(node_info.node_manager_address(), + node_info.node_manager_port(), client_call_manager_)); remote_node_manager_clients_.emplace(client_id, std::move(client)); // Fetch resource info for the remote client and update cluster resource map. @@ -420,10 +419,10 @@ void NodeManager::ClientAdded(const ClientTableData &client_data) { })); } -void NodeManager::ClientRemoved(const ClientTableData &client_data) { +void NodeManager::ClientRemoved(const GcsNodeInfo &node_info) { // TODO(swang): If we receive a notification for our own death, clean up and // exit immediately. - const ClientID client_id = ClientID::FromBinary(client_data.client_id()); + const ClientID client_id = ClientID::FromBinary(node_info.node_id()); RAY_LOG(DEBUG) << "[ClientRemoved] Received callback from client id " << client_id; RAY_CHECK(client_id != gcs_client_->client_table().GetLocalClientId()) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d7418ea3a..acafc256b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -29,8 +29,8 @@ namespace ray { namespace raylet { using rpc::ActorTableData; -using rpc::ClientTableData; using rpc::ErrorType; +using rpc::GcsNodeInfo; using rpc::HeartbeatBatchTableData; using rpc::HeartbeatTableData; using rpc::JobTableData; @@ -175,12 +175,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// /// \param data Data associated with the new client. /// \return Void. - void ClientAdded(const ClientTableData &data); + void ClientAdded(const GcsNodeInfo &data); /// Handler for the removal of a GCS client. - /// \param client_data Data associated with the removed client. + /// \param node_info Data associated with the removed client. /// \return Void. - void ClientRemoved(const ClientTableData &client_data); + void ClientRemoved(const GcsNodeInfo &node_info); /// Handler for the addition or updation of a resource in the GCS /// \param client_id ID of the node that created or updated resources. diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 331ad027d..73525f2e4 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -139,8 +139,8 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { client_id_2 = gcs_client_2->client_table().GetLocalClientId(); gcs_client_1->client_table().RegisterClientAddedCallback( [this](gcs::RedisGcsClient *client, const ClientID &id, - const ClientTableDataT &data) { - ClientID parsed_id = ClientID::FromBinary(data.client_id); + const rpc::GcsNodeInfo &data) { + ClientID parsed_id = ClientID::FromBinary(data.node_id); if (parsed_id == client_id_1 || parsed_id == client_id_2) { num_connected_clients += 1; } @@ -208,17 +208,17 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { RAY_LOG(INFO) << "\n" << "All connected clients:" << "\n"; - ClientTableDataT data; + rpc::GcsNodeInfo data; gcs_client_2->client_table().GetClient(client_id_1, data); - RAY_LOG(INFO) << (ClientID::FromBinary(data.client_id).IsNil()); - RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data.client_id); - RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address; - RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port; - ClientTableDataT data2; + RAY_LOG(INFO) << (ClientID::FromBinary(data.node_id()).IsNil()); + RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data.node_id()); + RAY_LOG(INFO) << "ClientIp=" << data.node_manager_address(); + RAY_LOG(INFO) << "ClientPort=" << data.node_manager_port(); + rpc::GcsNodeInfo data2; gcs_client_1->client_table().GetClient(client_id_2, data2); - RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data2.client_id); - RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address; - RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port; + RAY_LOG(INFO) << "ClientID=" << ClientID::FromBinary(data2.node_id()); + RAY_LOG(INFO) << "ClientIp=" << data2.node_manager_address(); + RAY_LOG(INFO) << "ClientPort=" << data2.node_manager_port(); } }; diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 0322e64a3..bcca12411 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -50,20 +50,20 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address, const std::string &redis_password, boost::asio::io_service &io_service, const NodeManagerConfig &node_manager_config) { - ClientTableData client_info = gcs_client_->client_table().GetLocalClient(); - client_info.set_node_manager_address(node_ip_address); - client_info.set_raylet_socket_name(raylet_socket_name); - client_info.set_object_store_socket_name(object_store_socket_name); - client_info.set_object_manager_port(object_manager_.GetServerPort()); - client_info.set_node_manager_port(node_manager_.GetServerPort()); + GcsNodeInfo node_info = gcs_client_->client_table().GetLocalClient(); + node_info.set_node_manager_address(node_ip_address); + node_info.set_raylet_socket_name(raylet_socket_name); + node_info.set_object_store_socket_name(object_store_socket_name); + node_info.set_object_manager_port(object_manager_.GetServerPort()); + node_info.set_node_manager_port(node_manager_.GetServerPort()); RAY_LOG(DEBUG) << "Node manager " << gcs_client_->client_table().GetLocalClientId() - << " started on " << client_info.node_manager_address() << ":" - << client_info.node_manager_port() << " object manager at " - << client_info.node_manager_address() << ":" - << client_info.object_manager_port(); + << " started on " << node_info.node_manager_address() << ":" + << node_info.node_manager_port() << " object manager at " + << node_info.node_manager_address() << ":" + << node_info.object_manager_port(); ; - RAY_RETURN_NOT_OK(gcs_client_->client_table().Connect(client_info)); + RAY_RETURN_NOT_OK(gcs_client_->client_table().Connect(node_info)); // Add resource information. std::unordered_map> resources; diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index d39362f70..7962d5114 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -16,7 +16,7 @@ namespace ray { namespace raylet { -using rpc::ClientTableData; +using rpc::GcsNodeInfo; class NodeManager;