From a866be381c9cb2bcf8aa0b4e82e0b2608232e8d7 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Thu, 1 Oct 2020 15:54:53 -0700 Subject: [PATCH] [New Scheduler] Heartbeat (#11024) * . * refactor * . * . * done? * . * . * . * lint * no light heartbeat, no tests, fields 2,3 * . * manually clang format :( * . * . * test * . * . * task manager heartbeat * lint * . * add reminder * CR * CR * cleanup * CR * comment * lint * . --- src/ray/raylet/node_manager.cc | 88 ++++++++++--------- .../scheduling/cluster_resource_data.cc | 13 +++ .../raylet/scheduling/cluster_resource_data.h | 2 + .../scheduling/cluster_resource_scheduler.cc | 46 +++++++++- .../scheduling/cluster_resource_scheduler.h | 20 ++++- .../cluster_resource_scheduler_test.cc | 80 ++++++++++++++++- .../raylet/scheduling/cluster_task_manager.cc | 70 ++++++++++++++- .../raylet/scheduling/cluster_task_manager.h | 10 +++ .../scheduling/cluster_task_manager_test.cc | 80 +++++++++++++++++ src/ray/raylet/scheduling/fixed_point.cc | 2 +- src/ray/raylet/scheduling/fixed_point.h | 2 +- src/ray/raylet/scheduling/scheduling_ids.cc | 4 +- src/ray/raylet/scheduling/scheduling_ids.h | 4 +- 13 files changed, 369 insertions(+), 52 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 5785a8cb8..cf0fff2aa 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -391,64 +391,70 @@ void NodeManager::Heartbeat() { SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; heartbeat_data->set_client_id(self_node_id_.Binary()); - // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet directly. - // TODO(atumanov): implement a ResourceSet const_iterator. - // If light heartbeat enabled, we only set filed that represent resources changed. - if (light_heartbeat_enabled_) { - if (!last_heartbeat_resources_.GetTotalResources().IsEqual( - local_resources.GetTotalResources())) { + if (new_scheduler_enabled_) { + new_resource_scheduler_->Heartbeat(light_heartbeat_enabled_, heartbeat_data); + cluster_task_manager_->Heartbeat(light_heartbeat_enabled_, heartbeat_data); + } else { + // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet + // directly. + // TODO(atumanov): implement a ResourceSet const_iterator. + // If light heartbeat enabled, we only set filed that represent resources changed. + if (light_heartbeat_enabled_) { + if (!last_heartbeat_resources_.GetTotalResources().IsEqual( + local_resources.GetTotalResources())) { + for (const auto &resource_pair : + local_resources.GetTotalResources().GetResourceMap()) { + (*heartbeat_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources_.SetTotalResources( + ResourceSet(local_resources.GetTotalResources())); + } + + if (!last_heartbeat_resources_.GetAvailableResources().IsEqual( + local_resources.GetAvailableResources())) { + heartbeat_data->set_resources_available_changed(true); + for (const auto &resource_pair : + local_resources.GetAvailableResources().GetResourceMap()) { + (*heartbeat_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources_.SetAvailableResources( + ResourceSet(local_resources.GetAvailableResources())); + } + + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); + if (!last_heartbeat_resources_.GetLoadResources().IsEqual( + local_resources.GetLoadResources())) { + heartbeat_data->set_resource_load_changed(true); + for (const auto &resource_pair : + local_resources.GetLoadResources().GetResourceMap()) { + (*heartbeat_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources_.SetLoadResources( + ResourceSet(local_resources.GetLoadResources())); + } + } else { + // If light heartbeat disabled, we send whole resources information every time. for (const auto &resource_pair : local_resources.GetTotalResources().GetResourceMap()) { (*heartbeat_data->mutable_resources_total())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetTotalResources( - ResourceSet(local_resources.GetTotalResources())); - } - if (!last_heartbeat_resources_.GetAvailableResources().IsEqual( - local_resources.GetAvailableResources())) { - heartbeat_data->set_resources_available_changed(true); for (const auto &resource_pair : local_resources.GetAvailableResources().GetResourceMap()) { (*heartbeat_data->mutable_resources_available())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetAvailableResources( - ResourceSet(local_resources.GetAvailableResources())); - } - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - if (!last_heartbeat_resources_.GetLoadResources().IsEqual( - local_resources.GetLoadResources())) { - heartbeat_data->set_resource_load_changed(true); + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { (*heartbeat_data->mutable_resource_load())[resource_pair.first] = resource_pair.second; } - last_heartbeat_resources_.SetLoadResources( - ResourceSet(local_resources.GetLoadResources())); - } - } else { - // If light heartbeat disabled, we send whole resources information every time. - for (const auto &resource_pair : - local_resources.GetTotalResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*heartbeat_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - for (const auto &resource_pair : - local_resources.GetLoadResources().GetResourceMap()) { - (*heartbeat_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; } } diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index d082f0c2e..926aa4305 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -1,5 +1,18 @@ #include "ray/raylet/scheduling/cluster_resource_data.h" +const std::string resource_labels[] = {ray::kCPU_ResourceLabel, + ray::kMemory_ResourceLabel, + ray::kGPU_ResourceLabel, ray::kTPU_ResourceLabel}; + +const std::string ResourceEnumToString(PredefinedResources resource) { + // TODO (Alex): We should replace this with a protobuf enum. + RAY_CHECK(resource < PredefinedResources_MAX) + << "Something went wrong. Please file a bug report with this stack " + "trace: https://github.com/ray-project/ray/issues/new."; + std::string label = resource_labels[resource]; + return label; +} + std::string VectorToString(const std::vector &vector) { std::stringstream buffer; diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index 27a45fc4a..ef2bdf714 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -28,6 +28,8 @@ /// List of predefined resources. enum PredefinedResources { CPU, MEM, GPU, TPU, PredefinedResources_MAX }; +const std::string ResourceEnumToString(PredefinedResources resource); + /// Helper function to compare two vectors with FixedPoint values. bool EqualVectors(const std::vector &v1, const std::vector &v2); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index f8cb4ae26..1e48ab1fc 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -14,6 +14,8 @@ #include "ray/raylet/scheduling/cluster_resource_scheduler.h" +namespace ray { + ClusterResourceScheduler::ClusterResourceScheduler( int64_t local_node_id, const NodeResources &local_node_resources) : local_node_id_(local_node_id) { @@ -294,7 +296,7 @@ bool ClusterResourceScheduler::AddNodeAvailableResources( } bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, - NodeResources *ret_resources) { + NodeResources *ret_resources) const { auto it = nodes_.find(node_id); if (it != nodes_.end()) { *ret_resources = it->second; @@ -779,3 +781,45 @@ void ClusterResourceScheduler::FreeLocalTaskResources( FreeTaskResourceInstances(task_allocation); UpdateLocalAvailableResourcesFromResourceInstances(); } + +void ClusterResourceScheduler::Heartbeat( + bool light_heartbeat_enabled, + std::shared_ptr heartbeat_data) const { + NodeResources resources; + + RAY_CHECK(GetNodeResources(local_node_id_, &resources)) + << "Error: Populating heartbeat failed. Please file a bug report: " + "https://github.com/ray-project/ray/issues/new."; + + if (light_heartbeat_enabled) { + // TODO + RAY_CHECK(false) << "TODO"; + } else { + for (int i = 0; i < PredefinedResources_MAX; i++) { + const auto &label = ResourceEnumToString((PredefinedResources)i); + const auto &capacity = resources.predefined_resources[i]; + if (capacity.available != 0) { + (*heartbeat_data->mutable_resources_available())[label] = + capacity.available.Double(); + } + if (capacity.total != 0) { + (*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double(); + } + } + for (auto it = resources.custom_resources.begin(); + it != resources.custom_resources.end(); it++) { + uint64_t custom_id = it->first; + const auto &capacity = it->second; + const auto &label = string_to_int_map_.Get(custom_id); + if (capacity.available != 0) { + (*heartbeat_data->mutable_resources_available())[label] = + capacity.available.Double(); + } + if (capacity.total != 0) { + (*heartbeat_data->mutable_resources_total())[label] = capacity.total.Double(); + } + } + } +} + +} // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index bfb9f984b..1b4d584b5 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -26,6 +26,12 @@ #include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/util/logging.h" +#include "src/ray/protobuf/gcs.pb.h" + +namespace ray { + +using rpc::HeartbeatTableData; + // Specify resources that consists of unit-size instances. static std::unordered_set UnitInstanceResources{CPU, GPU, TPU}; @@ -185,7 +191,7 @@ class ClusterResourceScheduler { /// Return resources associated to the given node_id in ret_resources. /// If node_id not found, return false; otherwise return true. - bool GetNodeResources(int64_t node_id, NodeResources *ret_resources); + bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; /// Get number of nodes in the cluster. int64_t NumNodes(); @@ -366,6 +372,18 @@ class ClusterResourceScheduler { // resources availabile at that node is 0.2 + 0.3 + 0.1 + 1. = 1.6 void UpdateLocalAvailableResourcesFromResourceInstances(); + /// Populate the relevant parts of the heartbeat table. This is intended for + /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// resources_available and resources_total. + /// + /// \param light_heartbeat_enabled Only send changed fields if true. + /// \param Output parameter. `resources_available` and `resources_total` are the only + /// fields used. + void Heartbeat(bool light_heartbeat_enabled, + std::shared_ptr data) const; + /// Return human-readable string for this scheduler state. std::string DebugString() const; }; + +} // end namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index fff1c54f4..31e252dad 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -18,6 +18,7 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "ray/common/task/scheduling_resources.h" #include "ray/raylet/scheduling/scheduling_ids.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION @@ -28,6 +29,7 @@ using namespace std; +namespace ray { // Used to path empty vector argiuments. vector EmptyIntVector; vector EmptyBoolVector; @@ -172,8 +174,6 @@ bool nodeResourcesEqual(const NodeResources &nr1, const NodeResources &nr2) { return true; } -namespace ray { - class ClusterResourceSchedulerTest : public ::testing::Test { public: void SetUp() {} @@ -970,6 +970,82 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { ASSERT_TRUE(EqualVectors(cpu_instances, expect_cpu_instance)); } +TEST_F(ClusterResourceSchedulerTest, HeartbeatTest) { + vector cust_ids{1, 2, 3, 4, 5}; + + NodeResources node_resources; + + std::unordered_map initial_resources( + {{"CPU", 1}, {"GPU", 2}, {"memory", 3}, {"1", 1}, {"2", 2}, {"3", 3}}); + ClusterResourceScheduler cluster_resources("0", initial_resources); + NodeResources other_node_resources; + vector other_pred_capacities{1. /* CPU */, 1. /* MEM */, 1. /* GPU */}; + vector other_cust_capacities{5., 4., 3., 2., 1.}; + initNodeResources(other_node_resources, other_pred_capacities, cust_ids, + other_cust_capacities); + cluster_resources.AddOrUpdateNode(12345, other_node_resources); + + { // Cluster is idle. + auto data = std::make_shared(); + cluster_resources.Heartbeat(false, data); + + auto available = data->resources_available(); + auto total = data->resources_total(); + + ASSERT_EQ(available[kCPU_ResourceLabel], 1); + ASSERT_EQ(available[kGPU_ResourceLabel], 2); + ASSERT_EQ(available[kMemory_ResourceLabel], 3); + ASSERT_EQ(available["1"], 1); + ASSERT_EQ(available["2"], 2); + ASSERT_EQ(available["3"], 3); + + ASSERT_EQ(total[kCPU_ResourceLabel], 1); + ASSERT_EQ(total[kGPU_ResourceLabel], 2); + ASSERT_EQ(total[kMemory_ResourceLabel], 3); + ASSERT_EQ(total["1"], 1); + ASSERT_EQ(total["2"], 2); + ASSERT_EQ(total["3"], 3); + + // GCS doesn't like entries which are 0 (like TPU). + ASSERT_EQ(available.size(), 6); + ASSERT_EQ(total.size(), 6); + } + { // Task running on node with {"CPU": 0.1, "1": 0.1} + std::shared_ptr allocations = + std::make_shared(); + allocations->predefined_resources = { + {0.1}, // CPU + }; + allocations->custom_resources = { + {1, {0.1}}, // "1" + }; + std::unordered_map allocation_map({ + {"CPU", 0.1}, + {"1", 0.1}, + }); + cluster_resources.AllocateLocalTaskResources(allocation_map, allocations); + auto data = std::make_shared(); + cluster_resources.Heartbeat(false, data); + + auto available = data->resources_available(); + auto total = data->resources_total(); + + ASSERT_EQ(available[kCPU_ResourceLabel], 0.9); + ASSERT_EQ(available[kGPU_ResourceLabel], 2); + ASSERT_EQ(available[kMemory_ResourceLabel], 3); + ASSERT_EQ(available["1"], 0.9); + ASSERT_EQ(available["2"], 2); + ASSERT_EQ(available["3"], 3); + + ASSERT_EQ(total[kCPU_ResourceLabel], 1); + ASSERT_EQ(total[kGPU_ResourceLabel], 2); + ASSERT_EQ(total[kMemory_ResourceLabel], 3); + ASSERT_EQ(total["1"], 1); + ASSERT_EQ(total["2"], 2); + ASSERT_EQ(total["3"], 3); + } +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 349bf76b6..05c5e0520 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -1,5 +1,6 @@ -#include "ray/raylet/scheduling/cluster_task_manager.h" +#include +#include "ray/raylet/scheduling/cluster_task_manager.h" #include "ray/util/logging.h" namespace ray { @@ -184,6 +185,73 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { return false; } +void ClusterTaskManager::Heartbeat(bool light_heartbeat_enabled, + std::shared_ptr data) const { + auto resource_loads = data->mutable_resource_load(); + auto resource_load_by_shape = + data->mutable_resource_load_by_shape()->mutable_resource_demands(); + + if (light_heartbeat_enabled) { + RAY_CHECK(false) << "TODO"; + } else { + // TODO (Alex): Implement the 1-CPU task optimization. + for (const auto &work : tasks_to_schedule_) { + const auto &task = std::get<0>(work); + const auto &resources = + task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + + auto by_shape_entry = resource_load_by_shape->Add(); + + for (const auto &resource : resources) { + // Add to `resource_loads`. + const auto &label = resource.first; + const auto &quantity = resource.second; + const auto &entry = resource_loads->find(label); + if (entry == resource_loads->end()) { + (*resource_loads)[label] = quantity; + } else { + (*resource_loads)[label] = entry->second + quantity; + } + + // TODO (Alex): Adding repeated entries with quantity 1 is fine, but inefficient. + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + // TODO (Alex): Technically being on `tasks_to_schedule` could also mean + // that the entire cluster is utilized. + by_shape_entry->set_num_infeasible_requests_queued(1); + } + } + + for (const auto &work : tasks_to_dispatch_) { + const auto &task = std::get<0>(work); + const auto &resources = + task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + + auto by_shape_entry = resource_load_by_shape->Add(); + + for (auto to_add_it = resources.begin(); to_add_it != resources.end(); + to_add_it++) { + // Add to `resource_loads`. + const auto &label = to_add_it->first; + const auto &quantity = to_add_it->second; + const auto &entry = resource_loads->find(label); + if (entry == resource_loads->end()) { + (*resource_loads)[label] = quantity; + } else { + (*resource_loads)[label] = entry->second + quantity; + } + + // TODO (Alex): Adding repeated entries with quantity 1 is fine, but inefficient. + // Add to `resource_load_by_shape`. + (*by_shape_entry->mutable_shape())[label] = quantity; + // TODO (Alex): Technically being on `tasks_to_schedule` could also mean + // that the entire cluster is utilized. + by_shape_entry->set_num_ready_requests_queued(1); + } + } + } +} + std::string ClusterTaskManager::DebugString() { std::stringstream buffer; buffer << "========== Node: " << self_node_id_ << " =================\n"; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index d72852121..a4740b822 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -96,6 +96,16 @@ class ClusterTaskManager { /// false if the task is already running. bool CancelTask(const TaskID &task_id); + /// Populate the relevant parts of the heartbeat table. This is intended for + /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// resource_load and resource_load_by_shape. + /// + /// \param light_heartbeat_enabled Only send changed fields if true. + /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only + /// fields used. + void Heartbeat(bool light_heartbeat_enabled, + std::shared_ptr data) const; + std::string DebugString(); private: diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 8dc939a32..0aef43554 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -483,10 +483,90 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { ASSERT_EQ(leased_workers_.size(), 1); } +TEST_F(ClusterTaskManagerTest, HeartbeatTest) { + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + + { + Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}}); + rpc::RequestWorkerLeaseReply reply; + + bool callback_called = false; + bool *callback_called_ptr = &callback_called; + auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + + task_manager_.QueueTask(task, &reply, callback); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + ASSERT_TRUE(callback_called); + // Now {CPU: 7, GPU: 4, MEM:128} + } + + { + Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}}); + rpc::RequestWorkerLeaseReply reply; + + bool callback_called = false; + bool *callback_called_ptr = &callback_called; + auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + + task_manager_.QueueTask(task, &reply, callback); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + ASSERT_FALSE(callback_called); // No worker available. + // Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task. + } + + { + Task task = CreateTask({{ray::kCPU_ResourceLabel, 9}, {ray::kGPU_ResourceLabel, 5}}); + rpc::RequestWorkerLeaseReply reply; + + bool callback_called = false; + bool *callback_called_ptr = &callback_called; + auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + + task_manager_.QueueTask(task, &reply, callback); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + ASSERT_FALSE(callback_called); // Infeasible. + // Now there is also an infeasible task {CPU: 9}. + } + + { + auto data = std::make_shared(); + task_manager_.Heartbeat(false, data); + + auto load = data->mutable_resource_load(); + ASSERT_EQ(load->size(), 2); + ASSERT_EQ((*load)["CPU"], 10); // 9 + 1 = 10 + ASSERT_EQ((*load)["GPU"], 5); + + auto load_by_shape = + data->mutable_resource_load_by_shape()->mutable_resource_demands(); + ASSERT_EQ(load_by_shape->size(), 2); + + auto load1 = (*load_by_shape)[0]; + auto load2 = (*load_by_shape)[1]; + + ASSERT_EQ(load1.num_infeasible_requests_queued(), 1); + ASSERT_EQ(load1.num_ready_requests_queued(), 0); + ASSERT_EQ((*load1.mutable_shape())["CPU"], 9); + ASSERT_EQ((*load1.mutable_shape())["GPU"], 5); + ASSERT_EQ((*load1.mutable_shape()).size(), 2); + + ASSERT_EQ(load2.num_infeasible_requests_queued(), 0); + ASSERT_EQ(load2.num_ready_requests_queued(), 1); + ASSERT_EQ((*load2.mutable_shape())["CPU"], 1); + ASSERT_EQ((*load2.mutable_shape()).size(), 1); + } +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } } // namespace raylet + } // namespace ray diff --git a/src/ray/raylet/scheduling/fixed_point.cc b/src/ray/raylet/scheduling/fixed_point.cc index e7e7c8741..514805393 100644 --- a/src/ray/raylet/scheduling/fixed_point.cc +++ b/src/ray/raylet/scheduling/fixed_point.cc @@ -67,4 +67,4 @@ std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1) { return out; } -double FixedPoint::Double() { return round(i_) / RESOURCE_UNIT_SCALING; }; +double FixedPoint::Double() const { return round(i_) / RESOURCE_UNIT_SCALING; }; diff --git a/src/ray/raylet/scheduling/fixed_point.h b/src/ray/raylet/scheduling/fixed_point.h index a975a0158..33ac50aaf 100644 --- a/src/ray/raylet/scheduling/fixed_point.h +++ b/src/ray/raylet/scheduling/fixed_point.h @@ -37,7 +37,7 @@ class FixedPoint { bool operator==(FixedPoint const &ru1) const; bool operator!=(FixedPoint const &ru1) const; - double Double(); + double Double() const; friend std::ostream &operator<<(std::ostream &out, FixedPoint const &ru1); }; diff --git a/src/ray/raylet/scheduling/scheduling_ids.cc b/src/ray/raylet/scheduling/scheduling_ids.cc index 345b854ee..e5c946091 100644 --- a/src/ray/raylet/scheduling/scheduling_ids.cc +++ b/src/ray/raylet/scheduling/scheduling_ids.cc @@ -14,7 +14,7 @@ #include "ray/raylet/scheduling/scheduling_ids.h" -int64_t StringIdMap::Get(const std::string &string_id) { +int64_t StringIdMap::Get(const std::string &string_id) const { auto it = string_to_int_.find(string_id); if (it == string_to_int_.end()) { return -1; @@ -23,7 +23,7 @@ int64_t StringIdMap::Get(const std::string &string_id) { } }; -std::string StringIdMap::Get(uint64_t id) { +std::string StringIdMap::Get(uint64_t id) const { std::string id_string; auto it = int_to_string_.find(id); if (it == int_to_string_.end()) { diff --git a/src/ray/raylet/scheduling/scheduling_ids.h b/src/ray/raylet/scheduling/scheduling_ids.h index 81cd71ba9..53d456e58 100644 --- a/src/ray/raylet/scheduling/scheduling_ids.h +++ b/src/ray/raylet/scheduling/scheduling_ids.h @@ -36,13 +36,13 @@ class StringIdMap { /// /// \param String ID. /// \return The integer ID associated with the given string ID. - int64_t Get(const std::string &string_id); + int64_t Get(const std::string &string_id) const; /// Get string ID associated with an existing integer ID. /// /// \param Integre ID. /// \return The string ID associated with the given integer ID. - std::string Get(uint64_t id); + std::string Get(uint64_t id) const; /// Insert a string ID and get the associated integer ID. ///