From 43b6513d19f540a7a0985fbe63c79df4df52f8b4 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Thu, 11 Jul 2019 13:17:19 +0800 Subject: [PATCH] [GCS] Move node resource info from client table to resource table (#5050) --- doc/source/development.rst | 2 +- .../java/org/ray/runtime/gcs/GcsClient.java | 54 +++---- .../java/org/ray/runtime/gcs/RedisClient.java | 7 +- python/ray/gcs_utils.py | 2 + python/ray/state.py | 77 ++++++---- python/ray/tests/cluster_utils.py | 4 +- src/ray/gcs/client_test.cc | 30 ++-- src/ray/gcs/tables.cc | 118 +++------------ src/ray/gcs/tables.h | 18 +-- src/ray/object_manager/object_directory.cc | 2 +- src/ray/protobuf/gcs.proto | 19 +-- src/ray/raylet/monitor.cc | 3 +- src/ray/raylet/node_manager.cc | 134 +++++++++--------- src/ray/raylet/node_manager.h | 12 +- src/ray/raylet/raylet.cc | 15 +- src/ray/raylet/scheduling_resources.cc | 38 ----- src/ray/raylet/scheduling_resources.h | 12 -- src/ray/util/logging.h | 2 + 18 files changed, 215 insertions(+), 334 deletions(-) diff --git a/doc/source/development.rst b/doc/source/development.rst index ecbed6c31..65d802553 100644 --- a/doc/source/development.rst +++ b/doc/source/development.rst @@ -84,7 +84,7 @@ like so: ray.nodes() # Returns current information about the nodes in the cluster, such as: # [{'ClientID': '2a9d2b34ad24a37ed54e4fcd32bf19f915742f5b', - # 'EntryType': 0, + # 'IsInsertion': True, # 'NodeManagerAddress': '1.2.3.4', # 'NodeManagerPort': 43280, # 'ObjectManagerPort': 38062, diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java b/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java index 17c248ed0..c5f849a75 100644 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java @@ -1,6 +1,7 @@ package org.ray.runtime.gcs; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.InvalidProtocolBufferException; import java.util.ArrayList; import java.util.HashMap; @@ -13,9 +14,9 @@ import org.ray.api.id.BaseId; import org.ray.api.id.TaskId; import org.ray.api.id.UniqueId; import org.ray.api.runtimecontext.NodeInfo; +import org.ray.runtime.generated.Gcs; import org.ray.runtime.generated.Gcs.ActorCheckpointIdData; import org.ray.runtime.generated.Gcs.ClientTableData; -import org.ray.runtime.generated.Gcs.ClientTableData.EntryType; import org.ray.runtime.generated.Gcs.TablePrefix; import org.ray.runtime.util.IdUtil; import org.slf4j.Logger; @@ -72,42 +73,47 @@ public class GcsClient { final UniqueId clientId = UniqueId .fromByteBuffer(data.getClientId().asReadOnlyByteBuffer()); - if (data.getEntryType() == EntryType.INSERTION) { + if (data.getIsInsertion()) { //Code path of node insertion. - Map resources = new HashMap<>(); - // Compute resources. - Preconditions.checkState( - data.getResourcesTotalLabelCount() == data.getResourcesTotalCapacityCount()); - for (int i = 0; i < data.getResourcesTotalLabelCount(); i++) { - resources.put(data.getResourcesTotalLabel(i), data.getResourcesTotalCapacity(i)); - } NodeInfo nodeInfo = new NodeInfo( - clientId, data.getNodeManagerAddress(), true, resources); + clientId, data.getNodeManagerAddress(), true, ImmutableMap.of()); clients.put(clientId, nodeInfo); - } else if (data.getEntryType() == EntryType.RES_CREATEUPDATE) { - Preconditions.checkState(clients.containsKey(clientId)); - NodeInfo nodeInfo = clients.get(clientId); - for (int i = 0; i < data.getResourcesTotalLabelCount(); i++) { - nodeInfo.resources.put(data.getResourcesTotalLabel(i), data.getResourcesTotalCapacity(i)); - } - } else if (data.getEntryType() == EntryType.RES_DELETE) { - Preconditions.checkState(clients.containsKey(clientId)); - NodeInfo nodeInfo = clients.get(clientId); - for (int i = 0; i < data.getResourcesTotalLabelCount(); i++) { - nodeInfo.resources.remove(data.getResourcesTotalLabel(i)); - } } else { // Code path of node deletion. - Preconditions.checkState(data.getEntryType() == EntryType.DELETION); NodeInfo nodeInfo = new NodeInfo(clientId, clients.get(clientId).nodeAddress, - false, clients.get(clientId).resources); + false, ImmutableMap.of()); clients.put(clientId, nodeInfo); } } + // Fill resources. + for (Map.Entry client : clients.entrySet()) { + if (client.getValue().isAlive) { + client.getValue().resources.putAll(getResourcesForClient(client.getKey())); + } + } + return new ArrayList<>(clients.values()); } + private Map getResourcesForClient(UniqueId clientId) { + final String prefix = TablePrefix.NODE_RESOURCE.toString(); + final byte[] key = ArrayUtils.addAll(prefix.getBytes(), clientId.getBytes()); + Map results = primary.hgetAll(key); + Map resources = new HashMap<>(); + for (Map.Entry entry : results.entrySet()) { + String resourceName = new String(entry.getKey()); + Gcs.ResourceTableData resourceTableData; + try { + resourceTableData = Gcs.ResourceTableData.parseFrom(entry.getValue()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Received invalid protobuf data from GCS."); + } + resources.put(resourceName, resourceTableData.getResourceCapacity()); + } + return resources; + } + /** * If the actor exists in GCS. */ diff --git a/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java b/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java index e7c1d5473..dbce9750e 100644 --- a/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java +++ b/java/runtime/src/main/java/org/ray/runtime/gcs/RedisClient.java @@ -49,14 +49,18 @@ public class RedisClient { return jedis.hset(key, field, value); } } - } public String hmset(String key, Map hash) { try (Jedis jedis = jedisPool.getResource()) { return jedis.hmset(key, hash); } + } + public Map hgetAll(byte[] key) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.hgetAll(key); + } } public String get(final String key, final String field) { @@ -67,7 +71,6 @@ public class RedisClient { return jedis.hget(key, field); } } - } public byte[] get(byte[] key) { diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 43ce42d91..b000a1d60 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -16,6 +16,7 @@ from ray.core.generated.gcs_pb2 import ( TablePrefix, TablePubsub, TaskTableData, + ResourceTableData, ) __all__ = [ @@ -32,6 +33,7 @@ __all__ = [ "TablePrefix", "TablePubsub", "TaskTableData", + "ResourceTableData", "construct_error_message", ] diff --git a/python/ray/state.py b/python/ray/state.py index c9f718fde..2e16bd7a0 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -50,47 +50,35 @@ def _parse_client_table(redis_client): for entry in gcs_entry.entries: client = gcs_utils.ClientTableData.FromString(entry) - resources = { - client.resources_total_label[i]: client.resources_total_capacity[i] - for i in range(len(client.resources_total_label)) - } client_id = ray.utils.binary_to_hex(client.client_id) - if client.entry_type == gcs_utils.ClientTableData.INSERTION: + if client.is_insertion: ordered_client_ids.append(client_id) node_info[client_id] = { "ClientID": client_id, - "EntryType": client.entry_type, + "IsInsertion": client.is_insertion, "NodeManagerAddress": client.node_manager_address, "NodeManagerPort": client.node_manager_port, "ObjectManagerPort": client.object_manager_port, "ObjectStoreSocketName": client.object_store_socket_name, - "RayletSocketName": client.raylet_socket_name, - "Resources": resources + "RayletSocketName": client.raylet_socket_name } - # If this client is being updated, then it must + # If this client is being removed, then it must # have previously been inserted, and # it cannot have previously been removed. else: assert client_id in node_info, "Client not found!" - is_deletion = (node_info[client_id]["EntryType"] != - gcs_utils.ClientTableData.DELETION) - assert is_deletion, "Unexpected updation of deleted client." - res_map = node_info[client_id]["Resources"] - if client.entry_type == gcs_utils.ClientTableData.RES_CREATEUPDATE: - for res in resources: - res_map[res] = resources[res] - elif client.entry_type == gcs_utils.ClientTableData.RES_DELETE: - for res in resources: - res_map.pop(res, None) - elif client.entry_type == gcs_utils.ClientTableData.DELETION: - pass # Do nothing with the resmap if client deletion - else: - raise RuntimeError("Unexpected EntryType {}".format( - client.entry_type)) - node_info[client_id]["Resources"] = res_map - node_info[client_id]["EntryType"] = client.entry_type + assert node_info[client_id]["IsInsertion"], ( + "Unexpected duplicate removal of client.") + node_info[client_id]["IsInsertion"] = client.is_insertion + # Fill resource info. + for client_id in ordered_client_ids: + if node_info[client_id]["IsInsertion"]: + resources = _parse_resource_table(redis_client, client_id) + else: + resources = {} + node_info[client_id]["Resources"] = resources # NOTE: We return the list comprehension below instead of simply doing # 'list(node_info.values())' in order to have the nodes appear in the order # that they joined the cluster. Python dictionaries do not preserve @@ -100,6 +88,38 @@ def _parse_client_table(redis_client): return [node_info[client_id] for client_id in ordered_client_ids] +def _parse_resource_table(redis_client, client_id): + """Read the resource table with given client id. + + Args: + redis_client: A client to the primary Redis shard. + client_id: The client ID of the node in hex. + + Returns: + A dict of resources about this node. + """ + message = redis_client.execute_command( + "RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("NODE_RESOURCE"), "", + ray.utils.hex_to_binary(client_id)) + + if message is None: + return {} + + resources = {} + gcs_entry = gcs_utils.GcsEntry.FromString(message) + entries_len = len(gcs_entry.entries) + if entries_len % 2 != 0: + raise Exception("Invalid entry size for resource lookup: " + + str(entries_len)) + + for i in range(0, entries_len, 2): + resource_table_data = gcs_utils.ResourceTableData.FromString( + gcs_entry.entries[i + 1]) + resources[decode( + gcs_entry.entries[i])] = resource_table_data.resource_capacity + return resources + + class GlobalState(object): """A class used to interface with the Ray control state. @@ -800,7 +820,7 @@ class GlobalState(object): clients = self.client_table() for client in clients: # Only count resources from latest entries of live clients. - if client["EntryType"] != gcs_utils.ClientTableData.DELETION: + if client["IsInsertion"]: for key, value in client["Resources"].items(): resources[key] += value return dict(resources) @@ -809,8 +829,7 @@ class GlobalState(object): """Returns a set of client IDs corresponding to clients still alive.""" return { client["ClientID"] - for client in self.client_table() - if (client["EntryType"] != gcs_utils.ClientTableData.DELETION) + for client in self.client_table() if (client["IsInsertion"]) } def available_resources(self): diff --git a/python/ray/tests/cluster_utils.py b/python/ray/tests/cluster_utils.py index 76dfd3000..c0871be30 100644 --- a/python/ray/tests/cluster_utils.py +++ b/python/ray/tests/cluster_utils.py @@ -8,7 +8,6 @@ import time import redis import ray -from ray.gcs_utils import ClientTableData logger = logging.getLogger(__name__) @@ -176,8 +175,7 @@ class Cluster(object): while time.time() - start_time < timeout: clients = ray.state._parse_client_table(redis_client) live_clients = [ - client for client in clients - if client["EntryType"] == ClientTableData.INSERTION + client for client in clients if client["IsInsertion"] ] expected = len(self.list_all_nodes()) diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index bf4bd5550..08d4ac3d3 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -1149,12 +1149,12 @@ void ClientTableNotification(gcs::AsyncGcsClient *client, const ClientID &client ASSERT_EQ(client_id, added_id); ASSERT_EQ(ClientID::FromBinary(data.client_id()), added_id); ASSERT_EQ(ClientID::FromBinary(data.client_id()), added_id); - ASSERT_EQ(data.entry_type() == ClientTableData::INSERTION, is_insertion); + ASSERT_EQ(data.is_insertion(), is_insertion); ClientTableData cached_client; client->client_table().GetClient(added_id, cached_client); ASSERT_EQ(ClientID::FromBinary(cached_client.client_id()), added_id); - ASSERT_EQ(cached_client.entry_type() == ClientTableData::INSERTION, is_insertion); + ASSERT_EQ(cached_client.is_insertion(), is_insertion); } void TestClientTableConnect(const JobID &job_id, @@ -1273,29 +1273,24 @@ void TestHashTable(const JobID &job_id, std::shared_ptr cli const int expected_count = 14; ClientID client_id = ClientID::FromRandom(); // Prepare the first resource map: data_map1. - auto cpu_data = std::make_shared(); - cpu_data->set_resource_name("CPU"); - cpu_data->set_resource_capacity(100); - auto gpu_data = std::make_shared(); - gpu_data->set_resource_name("GPU"); - gpu_data->set_resource_capacity(2); DynamicResourceTable::DataMap data_map1; + auto cpu_data = std::make_shared(); + cpu_data->set_resource_capacity(100); data_map1.emplace("CPU", cpu_data); + auto gpu_data = std::make_shared(); + gpu_data->set_resource_capacity(2); data_map1.emplace("GPU", gpu_data); // Prepare the second resource map: data_map2 which decreases CPU, // increases GPU and add a new CUSTOM compared to data_map1. - auto data_cpu = std::make_shared(); - data_cpu->set_resource_name("CPU"); - data_cpu->set_resource_capacity(50); - auto data_gpu = std::make_shared(); - data_gpu->set_resource_name("GPU"); - data_gpu->set_resource_capacity(10); - auto data_custom = std::make_shared(); - data_custom->set_resource_name("CUSTOM"); - data_custom->set_resource_capacity(2); DynamicResourceTable::DataMap data_map2; + auto data_cpu = std::make_shared(); + data_cpu->set_resource_capacity(50); data_map2.emplace("CPU", data_cpu); + auto data_gpu = std::make_shared(); + data_gpu->set_resource_capacity(10); data_map2.emplace("GPU", data_gpu); + auto data_custom = std::make_shared(); + data_custom->set_resource_capacity(2); data_map2.emplace("CUSTOM", data_custom); data_map2["CPU"]->set_resource_capacity(50); // This is a common comparison function for the test. @@ -1305,7 +1300,6 @@ void TestHashTable(const JobID &job_id, std::shared_ptr cli for (const auto &data : data1) { auto iter = data2.find(data.first); ASSERT_TRUE(iter != data2.end()); - ASSERT_EQ(iter->second->resource_name(), data.second->resource_name()); ASSERT_EQ(iter->second->resource_capacity(), data.second->resource_capacity()); } }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 587ab937e..cc555420b 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -495,8 +495,7 @@ void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callbac client_added_callback_ = callback; // Call the callback for any added clients that are cached. for (const auto &entry : client_cache_) { - if (!entry.first.IsNil() && - (entry.second.entry_type() == ClientTableData::INSERTION)) { + if (!entry.first.IsNil() && (entry.second.is_insertion())) { client_added_callback_(client_, entry.first, entry.second); } } @@ -506,35 +505,12 @@ void ClientTable::RegisterClientRemovedCallback(const ClientTableCallback &callb client_removed_callback_ = callback; // Call the callback for any removed clients that are cached. for (const auto &entry : client_cache_) { - if (!entry.first.IsNil() && entry.second.entry_type() == ClientTableData::DELETION) { + if (!entry.first.IsNil() && !entry.second.is_insertion()) { client_removed_callback_(client_, entry.first, entry.second); } } } -void ClientTable::RegisterResourceCreateUpdatedCallback( - const ClientTableCallback &callback) { - resource_createupdated_callback_ = callback; - // Call the callback for any clients that are cached. - for (const auto &entry : client_cache_) { - if (!entry.first.IsNil() && - (entry.second.entry_type() == ClientTableData::RES_CREATEUPDATE)) { - resource_createupdated_callback_(client_, entry.first, entry.second); - } - } -} - -void ClientTable::RegisterResourceDeletedCallback(const ClientTableCallback &callback) { - resource_deleted_callback_ = callback; - // Call the callback for any clients that are cached. - for (const auto &entry : client_cache_) { - if (!entry.first.IsNil() && - entry.second.entry_type() == ClientTableData::RES_DELETE) { - resource_deleted_callback_(client_, entry.first, entry.second); - } - } -} - void ClientTable::HandleNotification(AsyncGcsClient *client, const ClientTableData &data) { ClientID client_id = ClientID::FromBinary(data.client_id()); @@ -548,81 +524,35 @@ void ClientTable::HandleNotification(AsyncGcsClient *client, } else { // If the entry is in the cache, then the notification is new if the client // was alive and is now dead or resources have been updated. - bool was_not_deleted = (entry->second.entry_type() != ClientTableData::DELETION); - bool is_deleted = (data.entry_type() == ClientTableData::DELETION); - bool is_res_modified = ((data.entry_type() == ClientTableData::RES_CREATEUPDATE) || - (data.entry_type() == ClientTableData::RES_DELETE)); - is_notif_new = (was_not_deleted && (is_deleted || is_res_modified)); + bool was_not_deleted = entry->second.is_insertion(); + bool is_deleted = !data.is_insertion(); + is_notif_new = was_not_deleted && is_deleted; // Once a client with a given ID has been removed, it should never be added // again. If the entry was in the cache and the client was deleted, check // that this new notification is not an insertion. - if (entry->second.entry_type() == ClientTableData::DELETION) { - RAY_CHECK((data.entry_type() == ClientTableData::DELETION)) + if (!entry->second.is_insertion()) { + RAY_CHECK(!data.is_insertion()) << "Notification for addition of a client that was already removed:" << client_id; } } // Add the notification to our cache. Notifications are idempotent. - // If it is a new client or a client removal, add as is - if ((data.entry_type() == ClientTableData::INSERTION) || - (data.entry_type() == ClientTableData::DELETION)) { - RAY_LOG(DEBUG) << "[ClientTableNotification] ClientTable Insertion/Deletion " - "notification for client id " - << client_id << ". EntryType: " << int(data.entry_type()) - << ". Setting the client cache to data."; - client_cache_[client_id] = data; - } else if ((data.entry_type() == ClientTableData::RES_CREATEUPDATE) || - (data.entry_type() == ClientTableData::RES_DELETE)) { - RAY_LOG(DEBUG) << "[ClientTableNotification] ClientTable RES_CREATEUPDATE " - "notification for client id " - << client_id << ". EntryType: " << int(data.entry_type()) - << ". Updating the client cache with the delta from the log."; - - ClientTableData &cache_data = client_cache_[client_id]; - // Iterate over all resources in the new create/update notification - for (std::vector::size_type i = 0; i != data.resources_total_label_size(); i++) { - auto const &resource_name = data.resources_total_label(i); - auto const &capacity = data.resources_total_capacity(i); - - // If resource exists in the ClientTableData, update it, else create it - auto existing_resource_label = - std::find(cache_data.resources_total_label().begin(), - cache_data.resources_total_label().end(), resource_name); - if (existing_resource_label != cache_data.resources_total_label().end()) { - auto index = std::distance(cache_data.resources_total_label().begin(), - existing_resource_label); - // Resource already exists, set capacity if updation call.. - if (data.entry_type() == ClientTableData::RES_CREATEUPDATE) { - cache_data.set_resources_total_capacity(index, capacity); - } - // .. delete if deletion call. - else if (data.entry_type() == ClientTableData::RES_DELETE) { - cache_data.mutable_resources_total_label()->erase( - cache_data.resources_total_label().begin() + index); - cache_data.mutable_resources_total_capacity()->erase( - cache_data.resources_total_capacity().begin() + index); - } - } else { - // Resource does not exist, create resource and add capacity if it was a resource - // create call. - if (data.entry_type() == ClientTableData::RES_CREATEUPDATE) { - cache_data.add_resources_total_label(resource_name); - cache_data.add_resources_total_capacity(capacity); - } - } - } - } + RAY_LOG(DEBUG) << "[ClientTableNotification] ClientTable Insertion/Deletion " + "notification for client id " + << client_id << ". IsInsertion: " << data.is_insertion() + << ". Setting the client cache to data."; + client_cache_[client_id] = data; // If the notification is new, call any registered callbacks. ClientTableData &cache_data = client_cache_[client_id]; if (is_notif_new) { - if (data.entry_type() == ClientTableData::INSERTION) { + if (data.is_insertion()) { if (client_added_callback_ != nullptr) { client_added_callback_(client, client_id, cache_data); } RAY_CHECK(removed_clients_.find(client_id) == removed_clients_.end()); - } else if (data.entry_type() == ClientTableData::DELETION) { + } else { // NOTE(swang): The client should be added to this data structure before // the callback gets called, in case the callback depends on the data // structure getting updated. @@ -630,14 +560,6 @@ void ClientTable::HandleNotification(AsyncGcsClient *client, if (client_removed_callback_ != nullptr) { client_removed_callback_(client, client_id, cache_data); } - } else if (data.entry_type() == ClientTableData::RES_CREATEUPDATE) { - if (resource_createupdated_callback_ != nullptr) { - resource_createupdated_callback_(client, client_id, cache_data); - } - } else if (data.entry_type() == ClientTableData::RES_DELETE) { - if (resource_deleted_callback_ != nullptr) { - resource_deleted_callback_(client, client_id, cache_data); - } } } } @@ -664,7 +586,7 @@ Status ClientTable::Connect(const ClientTableData &local_client) { // Construct the data to add to the client table. auto data = std::make_shared(local_client_); - data->set_entry_type(ClientTableData::INSERTION); + data->set_is_insertion(true); // Callback to handle our own successful connection once we've added // ourselves. auto add_callback = [this](AsyncGcsClient *client, const UniqueID &log_key, @@ -682,7 +604,7 @@ Status ClientTable::Connect(const ClientTableData &local_client) { for (auto ¬ification : notifications) { // This is temporary fix for Issue 4140 to avoid connect to dead nodes. // TODO(yuhguo): remove this temporary fix after GCS entry is removable. - if (notification.entry_type() != ClientTableData::DELETION) { + if (notification.is_insertion()) { connected_nodes.emplace(notification.client_id(), notification); } else { auto iter = connected_nodes.find(notification.client_id()); @@ -713,7 +635,7 @@ Status ClientTable::Connect(const ClientTableData &local_client) { Status ClientTable::Disconnect(const DisconnectCallback &callback) { auto data = std::make_shared(local_client_); - data->set_entry_type(ClientTableData::DELETION); + data->set_is_insertion(false); auto add_callback = [this, callback](AsyncGcsClient *client, const ClientID &id, const ClientTableData &data) { HandleConnected(client, data); @@ -731,7 +653,7 @@ Status ClientTable::Disconnect(const DisconnectCallback &callback) { ray::Status ClientTable::MarkDisconnected(const ClientID &dead_client_id) { auto data = std::make_shared(); data->set_client_id(dead_client_id.Binary()); - data->set_entry_type(ClientTableData::DELETION); + data->set_is_insertion(false); return Append(JobID::Nil(), client_log_key_, data, nullptr); } @@ -812,8 +734,8 @@ template class Log; template class Table; template class Table; -template class Log; -template class Hash; +template class Log; +template class Hash; } // namespace gcs diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 364db09dc..189555d2a 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -32,7 +32,7 @@ using rpc::HeartbeatTableData; using rpc::JobTableData; using rpc::ObjectTableData; using rpc::ProfileTableData; -using rpc::RayResource; +using rpc::ResourceTableData; using rpc::TablePrefix; using rpc::TablePubsub; using rpc::TaskLeaseData; @@ -593,7 +593,7 @@ class Hash : private Log, using Log::num_lookups_; }; -class DynamicResourceTable : public Hash { +class DynamicResourceTable : public Hash { public: DynamicResourceTable(const std::vector> &contexts, AsyncGcsClient *client) @@ -872,16 +872,6 @@ class ClientTable : public Log { /// \param callback The callback to register. void RegisterClientRemovedCallback(const ClientTableCallback &callback); - /// Register a callback to call when a resource is created or updated. - /// - /// \param callback The callback to register. - void RegisterResourceCreateUpdatedCallback(const ClientTableCallback &callback); - - /// Register a callback to call when a resource is deleted. - /// - /// \param callback The callback to register. - void RegisterResourceDeletedCallback(const ClientTableCallback &callback); - /// Get a client's information from the cache. The cache only contains /// information for clients that we've heard a notification for. /// @@ -945,10 +935,6 @@ class ClientTable : public Log { ClientTableCallback client_added_callback_; /// The callback to call when a client is removed. ClientTableCallback client_removed_callback_; - /// The callback to call when a resource is created or updated. - ClientTableCallback resource_createupdated_callback_; - /// The callback to call when a resource is deleted. - ClientTableCallback resource_deleted_callback_; /// A cache for information about all clients. std::unordered_map client_cache_; /// The set of removed clients. diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index 5aa598924..8d28c0a84 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -111,7 +111,7 @@ void ObjectDirectory::LookupRemoteConnectionInfo( ClientID result_client_id = ClientID::FromBinary(client_data.client_id()); if (!result_client_id.IsNil()) { RAY_CHECK(result_client_id == connection_info.client_id); - if (client_data.entry_type() == ClientTableData::INSERTION) { + if (client_data.is_insertion()) { connection_info.ip = client_data.node_manager_address(); connection_info.port = static_cast(client_data.object_manager_port()); } diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 1dfb1a39a..9ed92f7ea 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -144,22 +144,12 @@ message ProfileTableData { repeated ProfileEvent profile_events = 4; } -message RayResource { - // The type of the resource. - string resource_name = 1; +message ResourceTableData { // The total capacity of this resource type. - double resource_capacity = 2; + double resource_capacity = 1; } message ClientTableData { - // Enum for the entry type in the ClientTable - enum EntryType { - INSERTION = 0; - DELETION = 1; - RES_CREATEUPDATE = 2; - RES_DELETE = 3; - } - // The client ID of the client that the message is about. bytes client_id = 1; // The IP address of the client's node manager. @@ -174,8 +164,9 @@ message ClientTableData { // The port at which the client's object manager is listening for TCP // connections from other object managers. int32 object_manager_port = 6; - // Enum to store the entry type in the log - EntryType entry_type = 7; + // True if the message is about the addition of a client and false if it is + // about the deletion of a client. + bool is_insertion = 7; // TODO(hchen): Define the following resources in map format. repeated string resources_total_label = 8; diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index d26f33cba..39bc3f9fb 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -52,8 +52,7 @@ void Monitor::Tick() { const std::vector &all_data) { bool marked = false; for (const auto &data : all_data) { - if (client_id.Binary() == data.client_id() && - data.entry_type() == ClientTableData::DELETION) { + if (client_id.Binary() == data.client_id() && !data.is_insertion()) { // The node has been marked dead by itself. marked = true; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index da275f5a6..419bf4e3c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -189,19 +189,33 @@ ray::Status NodeManager::RegisterGcs() { const ClientTableData &data) { ClientRemoved(data); }; gcs_client_->client_table().RegisterClientRemovedCallback(node_manager_client_removed); - // Register a callback on the client table for resource create/update requests - auto node_manager_resource_createupdated = - [this](gcs::AsyncGcsClient *client, const UniqueID &id, - const ClientTableData &data) { ResourceCreateUpdated(data); }; - gcs_client_->client_table().RegisterResourceCreateUpdatedCallback( - node_manager_resource_createupdated); - - // Register a callback on the client table for resource delete requests - auto node_manager_resource_deleted = - [this](gcs::AsyncGcsClient *client, const UniqueID &id, - const ClientTableData &data) { ResourceDeleted(data); }; - gcs_client_->client_table().RegisterResourceDeletedCallback( - node_manager_resource_deleted); + // Subscribe to resource changes. + const auto &resources_changed = + [this]( + gcs::AsyncGcsClient *client, const ClientID &id, + const gcs::GcsChangeMode change_mode, + const std::unordered_map> + &data) { + if (change_mode == gcs::GcsChangeMode::APPEND_OR_ADD) { + ResourceSet resource_set; + for (auto &entry : data) { + resource_set.AddOrUpdateResource(entry.first, + entry.second->resource_capacity()); + } + ResourceCreateUpdated(id, resource_set); + } + if (change_mode == gcs::GcsChangeMode::REMOVE) { + std::vector resource_names; + for (auto &entry : data) { + resource_names.push_back(entry.first); + } + ResourceDeleted(id, resource_names); + } + }; + RAY_RETURN_NOT_OK( + gcs_client_->resource_table().Subscribe(JobID::Nil(), ClientID::Nil(), + /*subscribe_callback=*/resources_changed, + /*done_callback=*/nullptr)); // Subscribe to heartbeat batches from the monitor. const auto &heartbeat_batch_added = @@ -378,10 +392,19 @@ void NodeManager::ClientAdded(const ClientTableData &client_data) { client_data.node_manager_port(), client_call_manager_)); remote_node_manager_clients_.emplace(client_id, std::move(client)); - ResourceSet resources_total( - rpc::VectorFromProtobuf(client_data.resources_total_label()), - rpc::VectorFromProtobuf(client_data.resources_total_capacity())); - cluster_resource_map_.emplace(client_id, SchedulingResources(resources_total)); + // Fetch resource info for the remote client and update cluster resource map. + RAY_CHECK_OK(gcs_client_->resource_table().Lookup( + JobID::Nil(), client_id, + [this](gcs::AsyncGcsClient *client, const ClientID &client_id, + const std::unordered_map> &pairs) { + ResourceSet resource_set; + for (auto &resource_entry : pairs) { + resource_set.AddOrUpdateResource(resource_entry.first, + resource_entry.second->resource_capacity()); + } + ResourceCreateUpdated(client_id, resource_set); + })); } void NodeManager::ClientRemoved(const ClientTableData &client_data) { @@ -433,25 +456,18 @@ void NodeManager::ClientRemoved(const ClientTableData &client_data) { lineage_cache_.FlushAllUncommittedTasks(); } -void NodeManager::ResourceCreateUpdated(const ClientTableData &client_data) { - const ClientID client_id = ClientID::FromBinary(client_data.client_id()); +void NodeManager::ResourceCreateUpdated(const ClientID &client_id, + const ResourceSet &createUpdatedResources) { const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from client id " - << client_id << ". Updating resource map."; - ResourceSet new_res_set( - rpc::VectorFromProtobuf(client_data.resources_total_label()), - rpc::VectorFromProtobuf(client_data.resources_total_capacity())); - - const ResourceSet &old_res_set = cluster_resource_map_[client_id].GetTotalResources(); - ResourceSet difference_set = old_res_set.FindUpdatedResources(new_res_set); - RAY_LOG(DEBUG) << "[ResourceCreateUpdated] The difference in the resource map is " - << difference_set.ToString(); + << client_id << " with created or updated resources: " + << createUpdatedResources.ToString() << ". Updating resource map."; SchedulingResources &cluster_schedres = cluster_resource_map_[client_id]; // Update local_available_resources_ and SchedulingResources - for (const auto &resource_pair : difference_set.GetResourceMap()) { + for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &new_resource_capacity = resource_pair.second; @@ -470,28 +486,24 @@ void NodeManager::ResourceCreateUpdated(const ClientTableData &client_data) { return; } -void NodeManager::ResourceDeleted(const ClientTableData &client_data) { - const ClientID client_id = ClientID::FromBinary(client_data.client_id()); +void NodeManager::ResourceDeleted(const ClientID &client_id, + const std::vector &resource_names) { const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); - ResourceSet new_res_set( - rpc::VectorFromProtobuf(client_data.resources_total_label()), - rpc::VectorFromProtobuf(client_data.resources_total_capacity())); - RAY_LOG(DEBUG) << "[ResourceDeleted] received callback from client id " << client_id - << " with new resources: " << new_res_set.ToString() - << ". Updating resource map."; - - const ResourceSet &old_res_set = cluster_resource_map_[client_id].GetTotalResources(); - ResourceSet deleted_set = old_res_set.FindDeletedResources(new_res_set); - RAY_LOG(DEBUG) << "[ResourceDeleted] The difference in the resource map is " - << deleted_set.ToString(); + if (RAY_LOG_ENABLED(DEBUG)) { + std::ostringstream oss; + for (auto &resource_name : resource_names) { + oss << resource_name << ", "; + } + RAY_LOG(DEBUG) << "[ResourceDeleted] received callback from client id " << client_id + << " with deleted resources: " << oss.str() + << ". Updating resource map."; + } SchedulingResources &cluster_schedres = cluster_resource_map_[client_id]; // Update local_available_resources_ and SchedulingResources - for (const auto &resource_pair : deleted_set.GetResourceMap()) { - const std::string &resource_label = resource_pair.first; - + for (const auto &resource_label : resource_names) { cluster_schedres.DeleteResource(resource_label); if (client_id == local_client_id) { local_available_resources_.DeleteResource(resource_label); @@ -1264,31 +1276,19 @@ void NodeManager::ProcessSetResourceRequest( return; } - // Add the new resource to a skeleton ClientTableData object - ClientTableData data; - gcs_client_->client_table().GetClient(client_id, data); - // Replace the resource vectors with the resource deltas from the message. - // RES_CREATEUPDATE and RES_DELETE entries in the ClientTable track changes (deltas) in - // the resources - data.add_resources_total_label(resource_name); - data.add_resources_total_capacity(capacity); - // Set the correct flag for entry_type + // Submit to the client table. This calls the ResourceCreateUpdated or ResourceDeleted + // callback, which updates cluster_resource_map_. if (is_deletion) { - data.set_entry_type(ClientTableData::RES_DELETE); + RAY_CHECK_OK(gcs_client_->resource_table().RemoveEntries(JobID::Nil(), client_id, + {resource_name}, nullptr)); } else { - data.set_entry_type(ClientTableData::RES_CREATEUPDATE); + std::unordered_map> data_map; + auto resource_table_data = std::make_shared(); + resource_table_data->set_resource_capacity(capacity); + data_map.emplace(resource_name, resource_table_data); + RAY_CHECK_OK( + gcs_client_->resource_table().Update(JobID::Nil(), client_id, data_map, nullptr)); } - - // Submit to the client table. This calls the ResourceCreateUpdated callback, which - // updates cluster_resource_map_. - std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); - if (not worker) { - worker = worker_pool_.GetRegisteredDriver(client); - } - auto data_shared_ptr = std::make_shared(data); - auto client_table = gcs_client_->client_table(); - RAY_CHECK_OK(gcs_client_->client_table().Append( - JobID::Nil(), client_table.client_log_key_, data_shared_ptr, nullptr)); } void NodeManager::ScheduleTasks( diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 3c30c20c4..7fb10d642 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -130,14 +130,18 @@ class NodeManager : public rpc::NodeManagerServiceHandler { void ClientRemoved(const ClientTableData &client_data); /// Handler for the addition or updation of a resource in the GCS - /// \param client_data Data associated with the new client. + /// \param client_id ID of the node that created or updated resources. + /// \param createUpdatedResources Created or updated resources. /// \return Void. - void ResourceCreateUpdated(const ClientTableData &client_data); + void ResourceCreateUpdated(const ClientID &client_id, + const ResourceSet &createUpdatedResources); /// Handler for the deletion of a resource in the GCS - /// \param client_data Data associated with the new client. + /// \param client_id ID of the node that deleted resources. + /// \param resource_names Names of deleted resources. /// \return Void. - void ResourceDeleted(const ClientTableData &client_data); + void ResourceDeleted(const ClientID &client_id, + const std::vector &resource_names); /// Evaluates the local infeasible queue to check if any tasks can be scheduled. /// This is called whenever there's an update to the resources on the local client. diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 871dc5049..0544a5674 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -86,11 +86,6 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address, client_info.set_object_store_socket_name(object_store_socket_name); client_info.set_object_manager_port(object_manager_.GetServerPort()); client_info.set_node_manager_port(node_manager_.GetServerPort()); - // Add resource information. - for (const auto &resource_pair : node_manager_config.resource_config.GetResourceMap()) { - client_info.add_resources_total_label(resource_pair.first); - client_info.add_resources_total_capacity(resource_pair.second); - } RAY_LOG(DEBUG) << "Node manager " << gcs_client_->client_table().GetLocalClientId() << " started on " << client_info.node_manager_address() << ":" @@ -100,6 +95,16 @@ ray::Status Raylet::RegisterGcs(const std::string &node_ip_address, ; RAY_RETURN_NOT_OK(gcs_client_->client_table().Connect(client_info)); + // Add resource information. + std::unordered_map> resources; + for (const auto &resource_pair : node_manager_config.resource_config.GetResourceMap()) { + auto resource = std::make_shared(); + resource->set_resource_capacity(resource_pair.second); + resources.emplace(resource_pair.first, resource); + } + RAY_RETURN_NOT_OK(gcs_client_->resource_table().Update( + JobID::Nil(), gcs_client_->client_table().GetLocalClientId(), resources, nullptr)); + RAY_RETURN_NOT_OK(node_manager_.RegisterGcs()); return Status::OK(); diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index c80282601..6adbbb37c 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -284,44 +284,6 @@ const std::unordered_map return resource_capacity_; }; -ResourceSet ResourceSet::FindUpdatedResources( - const ray::raylet::ResourceSet &new_resource_set) const { - // Find any new resources and return a ResourceSet with the resource and new capacities - ResourceSet updated_resource_set; - for (const auto &resource_pair : new_resource_set.GetResourceAmountMap()) { - const std::string &resource_label = resource_pair.first; - const FractionalResourceQuantity &new_resource_capacity = resource_pair.second; - if (resource_capacity_.count(resource_label) == 1) { - // Resource exists, check if updated - const FractionalResourceQuantity &old_resource_capacity = - resource_capacity_.at(resource_label); - if (old_resource_capacity != new_resource_capacity) { - updated_resource_set.AddOrUpdateResource(resource_label, new_resource_capacity); - } - } else { - // Resource does not exist in the old set, add to return set - updated_resource_set.AddOrUpdateResource(resource_label, new_resource_capacity); - } - } - return updated_resource_set; -} - -ResourceSet ResourceSet::FindDeletedResources( - const ray::raylet::ResourceSet &new_resource_set) const { - // Find any new resources and return a ResourceSet with the resource and new capacities - ResourceSet deleted_resource_set; - auto &new_resource_map = new_resource_set.GetResourceAmountMap(); - for (const auto &resource_pair : resource_capacity_) { - const std::string &resource_label = resource_pair.first; - const FractionalResourceQuantity &old_resource_capacity = resource_pair.second; - if (new_resource_map.count(resource_label) != 1) { - // Resource does not exist, add to return set - deleted_resource_set.AddOrUpdateResource(resource_label, old_resource_capacity); - } - } - return deleted_resource_set; -} - /// ResourceIds class implementation ResourceIds::ResourceIds() {} diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index 9e3a2a64c..c06b8a194 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -158,18 +158,6 @@ class ResourceSet { /// \return Void. void SubtractResourcesStrict(const ResourceSet &other); - /// \brief Finds new resources created or updated in a new set. - /// - /// \param new_resource_set: The new resource set to compare with. - /// \return The ResourceSet of updated values - ResourceSet FindUpdatedResources(const ResourceSet &new_resource_set) const; - - /// \brief Finds resources deleted in a set. - /// - /// \param new_resource_set: The new resource set to compare with. - /// \return The ResourceSet of deleted resources with old capacities - ResourceSet FindDeletedResources(const ResourceSet &new_resource_set) const; - /// Return the capacity value associated with the specified resource. /// /// \param resource_name: Resource name for which capacity is requested. diff --git a/src/ray/util/logging.h b/src/ray/util/logging.h index 39428eba9..d61598260 100644 --- a/src/ray/util/logging.h +++ b/src/ray/util/logging.h @@ -10,6 +10,8 @@ enum class RayLogLevel { DEBUG = -1, INFO = 0, WARNING = 1, ERROR = 2, FATAL = 3 #define RAY_LOG_INTERNAL(level) ::ray::RayLog(__FILE__, __LINE__, level) +#define RAY_LOG_ENABLED(level) ray::RayLog::IsLevelEnabled(ray::RayLogLevel::level) + #define RAY_LOG(level) \ if (ray::RayLog::IsLevelEnabled(ray::RayLogLevel::level)) \ RAY_LOG_INTERNAL(ray::RayLogLevel::level)