From 68ac08332bbd018f53e8e97b7ad482bbd01a8e5d Mon Sep 17 00:00:00 2001 From: Ion Date: Fri, 22 Nov 2019 11:14:46 -0800 Subject: [PATCH] Initial commit of new cluster resource scheduler (#6178) --- BUILD.bazel | 2 + .../scheduling/cluster_resource_scheduler.cc | 196 +++++++ .../scheduling/cluster_resource_scheduler.h | 162 ++++++ src/ray/common/scheduling/scheduling_ids.cc | 43 +- src/ray/common/scheduling/scheduling_ids.h | 26 +- src/ray/common/scheduling/scheduling_test.cc | 545 +++++++++++++++++- 6 files changed, 926 insertions(+), 48 deletions(-) create mode 100644 src/ray/common/scheduling/cluster_resource_scheduler.cc create mode 100644 src/ray/common/scheduling/cluster_resource_scheduler.h diff --git a/BUILD.bazel b/BUILD.bazel index df9cd6d51..8237fd719 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -218,6 +218,8 @@ cc_library( ":ray_util", "@boost//:asio", "@com_github_grpc_grpc//:grpc++", + "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/container:flat_hash_set", "@com_google_googletest//:gtest", "@plasma//:plasma_client", ], diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.cc b/src/ray/common/scheduling/cluster_resource_scheduler.cc new file mode 100644 index 000000000..00069055a --- /dev/null +++ b/src/ray/common/scheduling/cluster_resource_scheduler.cc @@ -0,0 +1,196 @@ +#include "cluster_resource_scheduler.h" + +ClusterResourceScheduler::ClusterResourceScheduler( + int64_t local_node_id, const NodeResources &local_node_resources) + : local_node_id_(local_node_id) { + AddOrUpdateNode(local_node_id_, local_node_resources); +} + +void ClusterResourceScheduler::SetPredefinedResources(const NodeResources &new_resources, + NodeResources *old_resources) { + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + old_resources->capacities[i].total = new_resources.capacities[i].total; + old_resources->capacities[i].available = new_resources.capacities[i].available; + } +} + +void ClusterResourceScheduler::SetCustomResources( + const absl::flat_hash_map &new_custom_resources, + absl::flat_hash_map *old_custom_resources) { + old_custom_resources->clear(); + for (auto &elem : new_custom_resources) { + old_custom_resources->insert(elem); + } +} + +void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id, + const NodeResources &node_resources) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + // This node is new, so add it to the map. + nodes_.emplace(node_id, node_resources); + } else { + // This node exists, so update its resources. + NodeResources &resources = it->second; + SetPredefinedResources(node_resources, &resources); + SetCustomResources(node_resources.custom_resources, &resources.custom_resources); + } +} + +bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + // Node not found. + return false; + } else { + it->second.custom_resources.clear(); + nodes_.erase(it); + return true; + } +} + +int64_t ClusterResourceScheduler::IsSchedulable(const TaskRequest &task_req, + int64_t node_id, + const NodeResources &resources) { + int violations = 0; + + // First, check predefined resources. + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + if (task_req.predefined_resources[i].demand > resources.capacities[i].available) { + if (task_req.predefined_resources[i].soft) { + // A soft constraint has been violated. + violations++; + } else { + // A hard constraint has been violated. + return -1; + } + } + } + + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = resources.custom_resources.find(task_req.custom_resources[i].id); + + if (it == resources.custom_resources.end()) { + // Requested resource doesn't exist at this node. + if (task_req.custom_resources[i].req.soft) { + violations++; + } else { + return -1; + } + } else { + if (task_req.custom_resources[i].req.demand > it->second.available) { + // Resource constraint is violated. + if (task_req.custom_resources[i].req.soft) { + violations++; + } else { + return -1; + } + } + } + } + + if (task_req.placement_hints.size() > 0) { + auto it_p = task_req.placement_hints.find(node_id); + if (it_p == task_req.placement_hints.end()) { + // Node not found in the placement_hints list, so + // record this a soft constraint violation. + violations++; + } + } + + return violations; +} + +int64_t ClusterResourceScheduler::GetBestSchedulableNode(const TaskRequest &task_req, + int64_t *total_violations) { + // Min number of violations across all nodes that can schedule the request. + int64_t min_violations = INT_MAX; + // Node associated to min_violations. + int64_t best_node = -1; + *total_violations = 0; + + // Check whether local node is schedulable. We return immediately + // the local node only if there are zero violations. + auto it = nodes_.find(local_node_id_); + if (it != nodes_.end()) { + if (IsSchedulable(task_req, it->first, it->second) == 0) { + return local_node_id_; + } + } + + // Check whether any node in the request placement_hints, satisfes + // all resource constraints of the request. + for (auto it_p = task_req.placement_hints.begin(); + it_p != task_req.placement_hints.end(); ++it_p) { + auto it = nodes_.find(*it_p); + if (it != nodes_.end()) { + if (IsSchedulable(task_req, it->first, it->second) == 0) { + return it->first; + } + } + } + + for (auto it = nodes_.begin(); it != nodes_.end(); ++it) { + // Return -1 if node not schedulable. otherwise return the number + // of soft constraint violations. + int64_t violations; + + if ((violations = IsSchedulable(task_req, it->first, it->second)) == -1) { + break; + } + // Update the node with the smallest number of soft constraints violated. + if (min_violations > violations) { + min_violations = violations; + best_node = it->first; + } + if (violations == 0) { + *total_violations = 0; + return best_node; + } + } + *total_violations = min_violations; + return best_node; +} + +bool ClusterResourceScheduler::SubtractNodeAvailableResources( + int64_t node_id, const TaskRequest &task_req) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + return false; + } + NodeResources &resources = it->second; + + // Just double check this node can still schedule the task request. + if (IsSchedulable(task_req, local_node_id_, resources) == -1) { + return false; + } + + for (size_t i = 0; i < PredefinedResources_MAX; i++) { + resources.capacities[i].available = + std::max(static_cast(0), resources.capacities[i].available - + task_req.predefined_resources[i].demand); + } + + for (size_t i = 0; i < task_req.custom_resources.size(); i++) { + auto it = resources.custom_resources.find(task_req.custom_resources[i].id); + if (it != resources.custom_resources.end()) { + it->second.available = + std::max(static_cast(0), + it->second.available - task_req.custom_resources[i].req.demand); + } + } + return true; +} + +bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, + NodeResources *ret_resources) { + auto it = nodes_.find(node_id); + if (it != nodes_.end()) { + *ret_resources = it->second; + return true; + } else { + return false; + } +} + +int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); } diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.h b/src/ray/common/scheduling/cluster_resource_scheduler.h new file mode 100644 index 000000000..b90fff6c8 --- /dev/null +++ b/src/ray/common/scheduling/cluster_resource_scheduler.h @@ -0,0 +1,162 @@ +#ifndef RAY_COMMON_SCHEDULING_SCHEDULING_H +#define RAY_COMMON_SCHEDULING_SCHEDULING_H + +#include "absl/container/flat_hash_map.h" +#include "absl/container/flat_hash_set.h" + +#include +#include + +/// List of predefined resources. +enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX }; + +struct ResourceCapacity { + int64_t total; + int64_t available; +}; + +struct ResourceRequest { + /// Amount of resource being requested. + int64_t demand; + /// Specify whether the request is soft or hard. + /// If hard, the entire request is denied if the demand exceeds the resource + /// availability. Otherwise, the request can be still be granted. + /// Prefernces are given to the nodes with the lowest number of violations. + bool soft; +}; + +/// Resource request, including resource ID. This is used for custom resources. +struct ResourceRequestWithId { + /// Resource ID. + int64_t id; + /// Resource request. + ResourceRequest req; +}; + +struct NodeResources { + /// Available and total capacities for predefined resources. + std::vector capacities; + /// Map containing custom resources. The key of each entry represents the + /// custom resource ID. + absl::flat_hash_map custom_resources; +}; + +struct TaskRequest { + /// List of predefined resources required by the task. + std::vector predefined_resources; + /// List of custom resources required by the tasl. + std::vector custom_resources; + /// List of placement hints. A placement hint is a node on which + /// we desire to run this task. This is a soft constraint in that + /// the task will run on a different node in the cluster, if none of the + /// nodes in this list can schedule this task. + absl::flat_hash_set placement_hints; +}; + +/// Class encapsulating the cluster resources and the logic to assign +/// tasks to nodes based on the task's constraints and the available +/// resources at those nodes. +class ClusterResourceScheduler { + /// List of nodes in the clusters and their resources organized as a map. + /// The key of the map is the node ID. + absl::flat_hash_map nodes_; + /// ID of local node. + int64_t local_node_id_; + + /// Set predefined resources. + /// + /// \param[in] new_resources: New predefined resources. + /// \param[out] old_resources: Predefined resources to be updated. + void SetPredefinedResources(const NodeResources &new_resources, + NodeResources *old_resources); + /// Set custom resources. + /// + /// \param[in] new_resources: New custom resources. + /// \param[out] old_resources: Custom resources to be updated. + void SetCustomResources( + const absl::flat_hash_map &new_custom_resources, + absl::flat_hash_map *old_custom_resources); + + public: + ClusterResourceScheduler(void){}; + + /// Constructor initializing the resources associated with the local node. + /// + /// \param local_node_id: ID of local node, + /// \param node_resources: The total and the available resources associated + /// with the local node. + ClusterResourceScheduler(int64_t local_node_id, const NodeResources &node_resources); + + /// Add a new node or overwrite the resources of an existing node. + /// + /// \param node_id: Node ID. + /// \param node_resources: Up to date total and available resources of the node. + void AddOrUpdateNode(int64_t node_id, const NodeResources &node_resources); + + /// Remove node from the cluster data structure. This happens + /// when a node fails or it is removed from the cluster. + /// + /// \param ID of the node to be removed. + bool RemoveNode(int64_t node_id); + + /// Check whether a task request can be scheduled given a node. + /// + /// \param task_req: Task request to be scheduled. + /// \param node_id: ID of the node. + /// \param resources: Node's resources. (Note: Technically, this is + /// redundant, as we can get the node's resources from nodes_ + /// using node_id. However, typically both node_id and resources + /// are available when we call this function, and this way we avoid + /// a map find call which could be expensive.) + /// + /// \return: -1, if the request cannot be scheduled. This happens when at + /// least a hard constraints is violated. + /// >= 0, the number soft constraint violations. If 0, no + /// constraint is violated. + int64_t IsSchedulable(const TaskRequest &task_req, int64_t node_id, + const NodeResources &resources); + + /// Find a node in the cluster on which we can schedule a given task request. + /// + /// First, this function checks whether the local node can schedule + /// the request without violating any constraints. If yes, it returns the + /// ID of the local node. + /// + /// If not, this function checks whether there is another node in the cluster + /// that satisfies all request's constraints (both soft and hard). + /// + /// If no such node exists, the function checks whether there are nodes + /// that satisfy all the request's hard constraints, but might violate some + /// soft constraints. Among these nodes, it returns a node which violates + /// the least number of soft constraints. + /// + /// Finally, if no such node exists, return -1. + /// + /// \param task_req: Task to be scheduled. + /// \param violations: The number of soft constraint violations associated + /// with the node returned by this function (assuming + /// a node that can schedule task_req is found). + /// + /// \return -1, if no node can schedule the current request; otherwise, + /// return the ID of a node that can schedule the task request. + int64_t GetBestSchedulableNode(const TaskRequest &task_req, int64_t *violations); + + /// Update the available resources of a node when a task request is + /// scheduled on the given node. + /// + /// \param node_id: ID of node on which request is being scheduled. + /// \param task_req: task request being scheduled. + /// + /// \return true, if task_req can be indeed scheduled on the node, + /// and false otherwise. + bool SubtractNodeAvailableResources(int64_t node_id, const TaskRequest &task_req); + + /// Return resources associated to the given node_id in ret_resources. + /// If node_id not found, return false; otherwise return true. + bool GetNodeResources(int64_t node_id, NodeResources *ret_resources); + + /// Get number of nodes in the cluster. + int64_t NumNodes(); +}; + +#endif // RAY_COMMON_SCHEDULING_SCHEDULING_H diff --git a/src/ray/common/scheduling/scheduling_ids.cc b/src/ray/common/scheduling/scheduling_ids.cc index b8591c55d..c43f38049 100644 --- a/src/ray/common/scheduling/scheduling_ids.cc +++ b/src/ray/common/scheduling/scheduling_ids.cc @@ -1,8 +1,7 @@ #include "scheduling_ids.h" -using namespace std; -int64_t StringIdMap::get(const string sid) { - auto it = string_to_int_.find(sid); +int64_t StringIdMap::Get(const std::string &string_id) { + auto it = string_to_int_.find(string_id); if (it == string_to_int_.end()) { return -1; } else { @@ -10,19 +9,25 @@ int64_t StringIdMap::get(const string sid) { } }; -int64_t StringIdMap::insert(const string sid, bool test) { - auto sit = string_to_int_.find(sid); +int64_t StringIdMap::Insert(const std::string &string_id, uint8_t max_id) { + auto sit = string_to_int_.find(string_id); if (sit == string_to_int_.end()) { - int64_t id = test ? hasher_(sid) % 10 : hasher_(sid); - for (int i = 0; true; i++) { + int64_t id = hasher_(string_id); + if (max_id != 0) { + id = id % MAX_ID_TEST; + } + for (size_t i = 0; true; i++) { auto it = int_to_string_.find(id); if (it == int_to_string_.end()) { - /// No hash collision, so associated sid with id. - string_to_int_.insert(make_pair(sid, id)); - int_to_string_.insert(make_pair(id, sid)); + /// No hash collision, so associate string_id with id. + string_to_int_.emplace(string_id, id); + int_to_string_.emplace(id, string_id); break; } - id = test ? hasher_(sid + to_string(i)) % 10 : hasher_(sid + to_string(i)); + id = hasher_(string_id + std::to_string(i)); + if (max_id != 0) { + id = id % max_id; + } } return id; } else { @@ -30,24 +35,20 @@ int64_t StringIdMap::insert(const string sid, bool test) { } }; -void StringIdMap::remove(const string sid) { - auto sit = string_to_int_.find(sid); +void StringIdMap::Remove(const std::string &string_id) { + auto sit = string_to_int_.find(string_id); if (sit != string_to_int_.end()) { - uint64_t id = string_to_int_[sid]; + int_to_string_.erase(string_to_int_[string_id]); string_to_int_.erase(sit); - auto it = int_to_string_.find(id); - int_to_string_.erase(it); } }; -void StringIdMap::remove(int64_t id) { +void StringIdMap::Remove(int64_t id) { auto it = int_to_string_.find(id); if (it != int_to_string_.end()) { - string sid = int_to_string_[id]; + string_to_int_.erase(int_to_string_[id]); int_to_string_.erase(it); - auto sit = string_to_int_.find(sid); - string_to_int_.erase(sit); } }; -int64_t StringIdMap::count() { return string_to_int_.size(); } +int64_t StringIdMap::Count() { return string_to_int_.size(); } diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/common/scheduling/scheduling_ids.h index 337bdb564..469aac7f8 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/common/scheduling/scheduling_ids.h @@ -1,13 +1,17 @@ #ifndef RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H #define RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H +#include "absl/container/flat_hash_map.h" + #include -#include + +/// Limit the ID range to test for collisions. +#define MAX_ID_TEST 8 /// Class to map string IDs to unique integer IDs and back. class StringIdMap { - std::unordered_map string_to_int_; - std::unordered_map int_to_string_; + absl::flat_hash_map string_to_int_; + absl::flat_hash_map int_to_string_; std::hash hasher_; public: @@ -18,28 +22,28 @@ class StringIdMap { /// /// \param String ID. /// \return The integer ID associated with the given string ID. - int64_t get(const std::string sid); + int64_t Get(const std::string &string_id); /// Insert a string ID and get the associated integer ID. /// /// \param String ID to be inserted. - /// \param test: if "true" it specifies that the range of - /// IDs is limited to 0..10 for testing purposes. - /// \return The integer ID associated with string ID sid. - int64_t insert(const std::string sid, bool test = false); + /// \param max_id The number of unique possible ids. This is used + /// to force collisions for testing. If -1, it is not used. + /// \return The integer ID associated with string ID string_id. + int64_t Insert(const std::string &string_id, uint8_t num_ids = 0); /// Delete an ID identified by its string format. /// /// \param ID to be deleted. - void remove(const std::string sid); + void Remove(const std::string &string_id); /// Delete an ID identified by its integer format. /// /// \param ID to be deleted. - void remove(int64_t id); + void Remove(int64_t id); /// Get number of identifiers. - int64_t count(); + int64_t Count(); }; #endif // RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H diff --git a/src/ray/common/scheduling/scheduling_test.cc b/src/ray/common/scheduling/scheduling_test.cc index 386520b3c..0731a5267 100644 --- a/src/ray/common/scheduling/scheduling_test.cc +++ b/src/ray/common/scheduling/scheduling_test.cc @@ -2,11 +2,144 @@ #include "gtest/gtest.h" #include -#include +#include "ray/common/scheduling/cluster_resource_scheduler.h" #include "ray/common/scheduling/scheduling_ids.h" + +#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION +#include +#include "absl/container/flat_hash_map.h" +#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION + using namespace std; +/// Used to path empty vector argiuments. +vector EmptyIntVector; +vector EmptyBoolVector; + +void initTaskRequest(TaskRequest &tr, vector &pred_demands, + vector &pred_soft, vector &cust_ids, + vector &cust_demands, vector &cust_soft, + vector &placement_hints) { + for (size_t i = 0; i < pred_demands.size(); i++) { + ResourceRequest rq; + rq.demand = pred_demands[i]; + rq.soft = pred_soft[i]; + tr.predefined_resources.push_back(rq); + } + + for (size_t i = pred_demands.size(); i < PredefinedResources_MAX; i++) { + ResourceRequest rq; + rq.demand = 0; + rq.soft = 0; + tr.predefined_resources.push_back(rq); + } + + for (size_t i = 0; i < cust_ids.size(); i++) { + ResourceRequestWithId rq; + rq.id = cust_ids[i]; + rq.req.demand = cust_demands[i]; + rq.req.soft = cust_soft[i]; + tr.custom_resources.push_back(rq); + } + + for (size_t i = 0; i < placement_hints.size(); i++) { + tr.placement_hints.insert(placement_hints[i]); + } +}; + +void initNodeResources(NodeResources &node, vector &pred_capacities, + vector &cust_ids, vector &cust_capacities) { + for (size_t i = 0; i < pred_capacities.size(); i++) { + ResourceCapacity rc; + rc.total = rc.available = pred_capacities[i]; + node.capacities.push_back(rc); + } + + if (pred_capacities.size() < PredefinedResources_MAX) { + for (int i = pred_capacities.size(); i < PredefinedResources_MAX; i++) { + ResourceCapacity rc; + rc.total = rc.available = 0; + node.capacities.push_back(rc); + } + } + + ResourceCapacity rc; + for (size_t i = 0; i < cust_capacities.size(); i++) { + rc.total = rc.available = cust_capacities[i]; + node.custom_resources.insert(pair(cust_ids[i], rc)); + } +} + +void initCluster(ClusterResourceScheduler &cluster_resources, int n) { + vector pred_capacities; + vector cust_ids; + vector cust_capacities; + int i, k; + + for (i = 0; i < n; i++) { + NodeResources node_resources; + + for (k = 0; k < PredefinedResources_MAX; k++) { + if (rand() % 3 == 0) { + pred_capacities.push_back(0); + } else { + pred_capacities.push_back(rand() % 10); + } + } + + int m = min(rand() % PredefinedResources_MAX, n); + + int start = rand() % n; + for (k = 0; k < m; k++) { + cust_ids.push_back((start + k) % n); + cust_capacities.push_back(rand() % 10); + } + + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + + cluster_resources.AddOrUpdateNode(i, node_resources); + + node_resources.custom_resources.clear(); + } +} + +bool nodeResourcesEqual(const NodeResources &nr1, const NodeResources &nr2) { + if (nr1.capacities.size() != nr2.capacities.size()) { + cout << nr1.capacities.size() << " " << nr2.capacities.size() << endl; + return false; + } + + for (size_t i = 0; i < nr1.capacities.size(); i++) { + if (nr1.capacities[i].available != nr2.capacities[i].available) { + return false; + } + if (nr1.capacities[i].total != nr2.capacities[i].total) { + return false; + } + } + + if (nr1.custom_resources.size() != nr2.custom_resources.size()) { + return false; + } + + auto cr1 = nr1.custom_resources; + auto cr2 = nr2.custom_resources; + for (auto it1 = cr1.begin(); it1 != cr1.end(); ++it1) { + auto it2 = cr2.find(it1->first); + if (it2 == cr2.end()) { + return false; + } + if (it1->second.total != it2->second.total) { + return false; + } + if (it1->second.available != it2->second.available) { + return false; + } + } + return true; +} + namespace ray { class SchedulingTest : public ::testing::Test { @@ -19,33 +152,413 @@ class SchedulingTest : public ::testing::Test { TEST_F(SchedulingTest, SchedulingIdTest) { StringIdMap ids; hash hasher; - int num = 10; // should be greater than 10. + size_t num = 10; // should be greater than 10. - for (int i = 0; i < num; i++) { - ids.insert(to_string(i)); + for (size_t i = 0; i < num; i++) { + ids.Insert(to_string(i)); } - ASSERT_EQ(ids.count(), num); + ASSERT_EQ(ids.Count(), num); - ids.remove(to_string(1)); - ASSERT_EQ(ids.count(), num - 1); + ids.Remove(to_string(1)); + ASSERT_EQ(ids.Count(), num - 1); - ids.remove(hasher(to_string(2))); - ASSERT_EQ(ids.count(), num - 2); + ids.Remove(hasher(to_string(2))); + ASSERT_EQ(ids.Count(), num - 2); - ASSERT_TRUE(ids.get(to_string(3)) == static_cast(hasher(to_string(3)))); + ASSERT_TRUE(ids.Get(to_string(3)) == static_cast(hasher(to_string(3)))); - ASSERT_TRUE(ids.get(to_string(100)) == -1); + ASSERT_TRUE(ids.Get(to_string(100)) == -1); /// Test for handling collision. StringIdMap short_ids; - for (int i = 0; i < 10; i++) { - /// "true" reduces the range of IDs to [0..9] - int64_t id = short_ids.insert(to_string(i), true); - ASSERT_TRUE(id < 10); + uint8_t max_id = 8; + for (size_t i = 0; i < max_id; i++) { + int64_t id = short_ids.Insert(to_string(i), max_id); + ASSERT_TRUE(id < max_id); } - ASSERT_EQ(short_ids.count(), 10); + ASSERT_EQ(short_ids.Count(), max_id); } +TEST_F(SchedulingTest, SchedulingInitClusterTest) { + int num_nodes = 10; + ClusterResourceScheduler cluster_resources; + + initCluster(cluster_resources, num_nodes); + + ASSERT_EQ(cluster_resources.NumNodes(), num_nodes); +} + +TEST_F(SchedulingTest, SchedulingDeleteClusterNodeTest) { + int num_nodes = 4; + int64_t remove_id = 2; + + ClusterResourceScheduler cluster_resources; + + initCluster(cluster_resources, num_nodes); + cluster_resources.RemoveNode(remove_id); + + ASSERT_TRUE(num_nodes - 1 == cluster_resources.NumNodes()); +} + +TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) { + int num_nodes = 4; + int64_t update_id = 2; + ClusterResourceScheduler cluster_resources; + + initCluster(cluster_resources, num_nodes); + + NodeResources node_resources; + vector pred_capacities; + vector cust_ids; + vector cust_capacities; + int k; + + for (k = 0; k < PredefinedResources_MAX; k++) { + if (rand() % 3 == 0) { + pred_capacities.push_back(0); + } else { + pred_capacities.push_back(rand() % 10); + } + } + + int m = min(rand() % PredefinedResources_MAX, num_nodes); + + int start = rand() % num_nodes; + for (k = 0; k < m; k++) { + cust_ids.push_back((start + k) % num_nodes); + cust_capacities.push_back(rand() % 10); + + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + cluster_resources.AddOrUpdateNode(update_id, node_resources); + } + ASSERT_TRUE(num_nodes == cluster_resources.NumNodes()); +} + +TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) { + /// Create cluster resources. + NodeResources node_resources; + vector pred_capacities{10, 5, 3}; + vector cust_ids{1, 2}; + vector cust_capacities{5, 5}; + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + ClusterResourceScheduler cluster_resources(1, node_resources); + + { + TaskRequest task_req; +#define PRED_CUSTOM_LEN 2 + vector pred_demands{7, 7}; + vector pred_soft{false, true}; + vector cust_ids{1, 2}; + vector cust_demands{3, 10}; + vector cust_soft{false, true}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations > 0); + + NodeResources nr1, nr2; + ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr1)); + cluster_resources.SubtractNodeAvailableResources(node_id, task_req); + ASSERT_TRUE(cluster_resources.GetNodeResources(node_id, &nr2)); + + for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) { + int64_t t = nr1.capacities[i].available - task_req.predefined_resources[i].demand; + if (t < 0) t = 0; + ASSERT_EQ(nr2.capacities[i].available, t); + } + + for (size_t i = 0; i < PRED_CUSTOM_LEN; i++) { + auto it1 = nr1.custom_resources.find(task_req.custom_resources[i].id); + if (it1 != nr1.custom_resources.end()) { + auto it2 = nr2.custom_resources.find(task_req.custom_resources[i].id); + if (it2 != nr2.custom_resources.end()) { + int64_t t = it1->second.available - task_req.custom_resources[i].req.demand; + if (t < 0) t = 0; + ASSERT_EQ(it2->second.available, t); + } + } + } + } +} + +TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) { + ClusterResourceScheduler cluster_resources; + NodeResources nr, nr_out; + int64_t node_id = 1; + + /// Add node. + { + NodeResources node_resources; + vector pred_capacities{10, 5, 3}; + vector cust_ids{1, 2}; + vector cust_capacities{5, 5}; + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + cluster_resources.AddOrUpdateNode(node_id, node_resources); + nr = node_resources; + } + + /// Check whether node resources were correctly added. + if (cluster_resources.GetNodeResources(node_id, &nr_out)) { + ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); + } else { + ASSERT_TRUE(false); + } + + /// Update node. + { + NodeResources node_resources; + vector pred_capacities{10, 10}; + vector cust_ids{2, 3}; + vector cust_capacities{6, 6}; + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + cluster_resources.AddOrUpdateNode(node_id, node_resources); + nr = node_resources; + } + if (cluster_resources.GetNodeResources(node_id, &nr_out)) { + ASSERT_TRUE(nodeResourcesEqual(nr, nr_out)); + } else { + ASSERT_TRUE(false); + } +} + +TEST_F(SchedulingTest, SchedulingTaskRequestTest) { + /// Create cluster resources containing local node. + NodeResources node_resources; + vector pred_capacities{5, 5}; + vector cust_ids{1}; + vector cust_capacities{10}; + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + ClusterResourceScheduler cluster_resources(0, node_resources); + + { + NodeResources node_resources; + vector pred_capacities{10, 2, 3}; + vector cust_ids{1, 2}; + vector cust_capacities{5, 5}; + initNodeResources(node_resources, pred_capacities, cust_ids, cust_capacities); + cluster_resources.AddOrUpdateNode(1, node_resources); + } + /// Predefined resources, hard constraint violation + { + TaskRequest task_req; + vector pred_demands = {11}; + vector pred_soft = {false}; + initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyIntVector, + EmptyBoolVector, EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_EQ(node_id, -1); + } + /// Predefined resources, soft constraint violation + { + TaskRequest task_req; + vector pred_demands = {11}; + vector pred_soft = {true}; + initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyIntVector, + EmptyBoolVector, EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations > 0); + } + + /// Predefined resources, no constraint violation. + { + TaskRequest task_req; + vector pred_demands = {5}; + vector pred_soft = {false}; + initTaskRequest(task_req, pred_demands, pred_soft, EmptyIntVector, EmptyIntVector, + EmptyBoolVector, EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations == 0); + } + /// Custom resources, hard constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{1}; + vector cust_demands{11}; + vector cust_soft{false}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id == -1); + } + /// Custom resources, soft constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{1}; + vector cust_demands{11}; + vector cust_soft{true}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations > 0); + } + /// Custom resources, no constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{1}; + vector cust_demands{5}; + vector cust_soft{false}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations == 0); + } + /// Custom resource missing, hard constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{100}; + vector cust_demands{5}; + vector cust_soft{false}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id == -1); + } + /// Custom resource missing, soft constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{100}; + vector cust_demands{5}; + vector cust_soft{true}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + EmptyIntVector); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations > 0); + } + /// Placement_hints, soft constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{1}; + vector cust_demands{5}; + vector cust_soft{true}; + vector placement_hints{2, 3}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + placement_hints); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations > 0); + } + /// Placement hints, no constraint violation. + { + TaskRequest task_req; + vector pred_demands{5, 2}; + vector pred_soft{false, true}; + vector cust_ids{1}; + vector cust_demands{5}; + vector cust_soft{true}; + vector placement_hints{1, 2, 3}; + initTaskRequest(task_req, pred_demands, pred_soft, cust_ids, cust_demands, cust_soft, + placement_hints); + int64_t violations; + int64_t node_id = cluster_resources.GetBestSchedulableNode(task_req, &violations); + ASSERT_TRUE(node_id != -1); + ASSERT_TRUE(violations == 0); + } +} + +#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION +TEST_F(SchedulingTest, SchedulingMapPerformanceTest) { + size_t map_len = 1000000; + unordered_map umap_int_key; + unordered_map umap_string_key; + absl::flat_hash_map amap_int_key; + absl::flat_hash_map amap_string_key; + vector search_key_strings; + vector search_key_ints; + + for (size_t i = 0; i < map_len; i++) { + int id = rand() % map_len; + search_key_strings.push_back(to_string(id)); + search_key_ints.push_back(id); + umap_int_key.emplace(i, i); + umap_string_key.emplace(to_string(i), i); + amap_int_key.emplace(i, i); + amap_string_key.emplace(to_string(i), i); + } + + for (size_t i = 0; i < 25; i++) { + cout << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" << endl; + } + + int64_t sum; + + auto t_start = std::chrono::high_resolution_clock::now(); + sum = 0; + for (size_t i = 0; i < map_len; i++) { + auto it = umap_int_key.find(search_key_ints[i]); + if (it != umap_int_key.end()) { + sum += it->second; + } + } + auto t_end = std::chrono::high_resolution_clock::now(); + double duration = std::chrono::duration(t_end - t_start).count(); + cout << "sum = " << sum << " in " << duration << endl; + + t_start = std::chrono::high_resolution_clock::now(); + sum = 0; + for (size_t i = 0; i < map_len; i++) { + auto it = umap_string_key.find(search_key_strings[i]); + if (it != umap_string_key.end()) { + sum += it->second; + } + } + t_end = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration(t_end - t_start).count(); + cout << "sum = " << sum << " in " << duration << endl; + + t_start = std::chrono::high_resolution_clock::now(); + sum = 0; + for (size_t i = 0; i < map_len; i++) { + auto it = amap_int_key.find(search_key_ints[i]); + if (it != amap_int_key.end()) { + sum += it->second; + } + } + t_end = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration(t_end - t_start).count(); + cout << "sum = " << sum << " in " << duration << endl; + + t_start = std::chrono::high_resolution_clock::now(); + sum = 0; + for (size_t i = 0; i < map_len; i++) { + auto it = amap_string_key.find(search_key_strings[i]); + if (it != amap_string_key.end()) { + sum += it->second; + } + } + t_end = std::chrono::high_resolution_clock::now(); + duration = std::chrono::duration(t_end - t_start).count(); + cout << "sum = " << sum << " in " << duration << endl; +} +#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION + } // namespace ray int main(int argc, char **argv) {