diff --git a/BUILD.bazel b/BUILD.bazel index cfc90ec4a..86b3ed399 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -596,7 +596,7 @@ cc_library( "src/ray/raylet/**/*.cc", ], exclude = [ - "src/ray/raylet/*_test.cc", + "src/ray/raylet/**/*_test.cc", "src/ray/raylet/main.cc", ], ), @@ -680,7 +680,7 @@ cc_library( "src/ray/rpc/worker/*.cc", ], exclude = [ - "src/ray/core_worker/*_test.cc", + "src/ray/core_worker/**/*_test.cc", "src/ray/core_worker/mock_worker.cc", ], ), @@ -831,8 +831,22 @@ cc_test( ) cc_test( - name = "scheduling_test", - srcs = ["src/ray/raylet/scheduling/scheduling_test.cc"], + name = "cluster_resource_scheduler_test", + srcs = [ + "src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc", + ], + copts = COPTS, + deps = [ + ":raylet_lib", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "cluster_task_manager_test", + srcs = [ + "src/ray/raylet/scheduling/cluster_task_manager_test.cc", + ], copts = COPTS, deps = [ ":raylet_lib", diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index dec6d531e..37ab99e72 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -204,6 +204,12 @@ TaskID RandomTaskId() { return TaskID::FromBinary(data); } +JobID RandomJobId() { + std::string data(JobID::Size(), 0); + FillRandom(&data); + return JobID::FromBinary(data); +} + std::shared_ptr GenerateRandomBuffer() { auto seed = std::chrono::high_resolution_clock::now().time_since_epoch().count(); std::mt19937 gen(seed); diff --git a/src/ray/common/test_util.h b/src/ray/common/test_util.h index 64b802bbb..6a05a8ef5 100644 --- a/src/ray/common/test_util.h +++ b/src/ray/common/test_util.h @@ -60,6 +60,9 @@ int KillAllExecutable(const std::string &executable_with_suffix); // A helper function to return a random task id. TaskID RandomTaskId(); +// A helper function to return a random job id. +JobID RandomJobId(); + std::shared_ptr GenerateRandomBuffer(); std::shared_ptr GenerateRandomObject( diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3244bf146..dbc6ef025 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1730,7 +1730,9 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest if (new_scheduler_enabled_) { auto task_spec = task.GetTaskSpecification(); - cluster_task_manager_->QueueTask(task, reply, send_reply_callback); + cluster_task_manager_->QueueTask(task, reply, [send_reply_callback]() { + send_reply_callback(Status::OK(), nullptr, nullptr); + }); ScheduleAndDispatch(); return; } diff --git a/src/ray/raylet/scheduling/scheduling_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc similarity index 96% rename from src/ray/raylet/scheduling/scheduling_test.cc rename to src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index d7d9d9802..fff1c54f4 100644 --- a/src/ray/raylet/scheduling/scheduling_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" + #include #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "ray/raylet/scheduling/cluster_resource_scheduler.h" #include "ray/raylet/scheduling/scheduling_ids.h" #ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION @@ -173,14 +174,14 @@ bool nodeResourcesEqual(const NodeResources &nr1, const NodeResources &nr2) { namespace ray { -class SchedulingTest : public ::testing::Test { +class ClusterResourceSchedulerTest : public ::testing::Test { public: void SetUp() {} void Shutdown() {} }; -TEST_F(SchedulingTest, SchedulingFixedPointTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingFixedPointTest) { { FixedPoint fp(1.); FixedPoint fp1(1.); @@ -225,7 +226,7 @@ TEST_F(SchedulingTest, SchedulingFixedPointTest) { } } -TEST_F(SchedulingTest, SchedulingIdTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingIdTest) { StringIdMap ids; hash hasher; size_t num = 10; // should be greater than 10. @@ -241,7 +242,7 @@ TEST_F(SchedulingTest, SchedulingIdTest) { ids.Remove(hasher(to_string(2))); ASSERT_EQ(ids.Count(), num - 2); - ASSERT_TRUE(ids.Get(to_string(3)) == static_cast(hasher(to_string(3)))); + ASSERT_EQ(ids.Get(to_string(3)), static_cast(hasher(to_string(3)))); ASSERT_TRUE(ids.Get(to_string(100)) == -1); @@ -255,7 +256,7 @@ TEST_F(SchedulingTest, SchedulingIdTest) { ASSERT_EQ(short_ids.Count(), max_id); } -TEST_F(SchedulingTest, SchedulingInitClusterTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) { int num_nodes = 10; ClusterResourceScheduler cluster_resources; @@ -264,7 +265,7 @@ TEST_F(SchedulingTest, SchedulingInitClusterTest) { ASSERT_EQ(cluster_resources.NumNodes(), num_nodes); } -TEST_F(SchedulingTest, SchedulingDeleteClusterNodeTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) { int num_nodes = 4; int64_t remove_id = 2; @@ -276,7 +277,7 @@ TEST_F(SchedulingTest, SchedulingDeleteClusterNodeTest) { ASSERT_TRUE(num_nodes - 1 == cluster_resources.NumNodes()); } -TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) { int num_nodes = 4; int64_t update_id = 2; ClusterResourceScheduler cluster_resources; @@ -310,7 +311,7 @@ TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) { ASSERT_TRUE(num_nodes == cluster_resources.NumNodes()); } -TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { // Create cluster resources. NodeResources node_resources; vector pred_capacities{10, 5, 3}; @@ -360,7 +361,7 @@ TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) { } } -TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) { ClusterResourceScheduler cluster_resources; NodeResources nr, nr_out; int64_t node_id = 1; @@ -400,7 +401,7 @@ TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) { } } -TEST_F(SchedulingTest, SchedulingTaskRequestTest) { +TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) { // Create cluster resources containing local node. NodeResources node_resources; vector pred_capacities{5, 5}; @@ -562,7 +563,7 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) { } } -TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) { +TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) { // Create cluster resources containing local node. NodeResources node_resources; vector pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */}; @@ -586,7 +587,7 @@ TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) { ASSERT_EQ(expected_cluster_resources == available_cluster_resources, true); } -TEST_F(SchedulingTest, GetCPUInstancesDoubleTest) { +TEST_F(ClusterResourceSchedulerTest, GetCPUInstancesDoubleTest) { TaskResourceInstances task_resources; addTaskResourceInstances(true, {1., 1., 1.}, CPU, &task_resources); addTaskResourceInstances(true, {4.}, MEM, &task_resources); @@ -598,7 +599,7 @@ TEST_F(SchedulingTest, GetCPUInstancesDoubleTest) { ASSERT_EQ(EqualVectors(cpu_instances, expected_cpu_instances), true); } -TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) { +TEST_F(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest) { NodeResources node_resources; vector pred_capacities{3 /* CPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, @@ -629,7 +630,7 @@ TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) { ASSERT_EQ(EqualVectors(instances.available, expected_available), true); } -TEST_F(SchedulingTest, TaskResourceInstancesTest) { +TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { // Allocate resources for a task request specifying only predefined resources. { NodeResources node_resources; @@ -810,7 +811,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) { } } -TEST_F(SchedulingTest, TaskResourceInstancesTest2) { +TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { { NodeResources node_resources; vector pred_capacities{4. /* CPU */, 4. /* MEM */, 5. /* GPU */}; @@ -842,7 +843,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest2) { } } -TEST_F(SchedulingTest, TaskGPUResourceInstancesTest) { +TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { { NodeResources node_resources; vector pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */}; @@ -895,7 +896,8 @@ TEST_F(SchedulingTest, TaskGPUResourceInstancesTest) { } } -TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) { +TEST_F(ClusterResourceSchedulerTest, + UpdateLocalAvailableResourcesFromResourceInstancesTest) { { NodeResources node_resources; vector pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */}; @@ -942,7 +944,7 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) { } } -TEST_F(SchedulingTest, TaskResourceInstanceWithHardRequestTest) { +TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) { NodeResources node_resources; vector pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */}; initNodeResources(node_resources, pred_capacities, EmptyIntVector, diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index d1e85ea7b..47af947d3 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -132,8 +132,8 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( } void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) { - Work work = std::make_tuple(task, reply, send_reply_callback); + std::function callback) { + Work work = std::make_tuple(task, reply, callback); tasks_to_schedule_.push_back(work); } @@ -151,7 +151,7 @@ void ClusterTaskManager::Dispatch( std::shared_ptr worker, std::unordered_map> &leased_workers_, const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) { + std::function send_reply_callback) { reply->mutable_worker_address()->set_ip_address(worker->IpAddress()); reply->mutable_worker_address()->set_port(worker->Port()); reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary()); @@ -202,16 +202,16 @@ void ClusterTaskManager::Dispatch( } } } - send_reply_callback(Status::OK(), nullptr, nullptr); + send_reply_callback(); } void ClusterTaskManager::Spillback(ClientID spillback_to, std::string address, int port, rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback) { + std::function send_reply_callback) { reply->mutable_retry_at_raylet_address()->set_ip_address(address); reply->mutable_retry_at_raylet_address()->set_port(port); reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); - send_reply_callback(Status::OK(), nullptr, nullptr); + send_reply_callback(); } } // namespace raylet diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 85ccb0bc2..a173de753 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -12,7 +12,10 @@ namespace ray { namespace raylet { -typedef std::tuple Work; +/// Work represents all the information needed to make a scheduling decision. +/// This includes the task, the information we need to communicate to +/// dispatch/spillback and the callback to trigger it. +typedef std::tuple> Work; typedef std::function(const ClientID &node_id)> NodeInfoGetter; @@ -68,7 +71,7 @@ class ClusterTaskManager { /// \param fn: The function used during dispatching. /// \param task: The incoming task to schedule. void QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback); + std::function); /// Move tasks from waiting to ready for dispatch. Called when a task's /// dependencies are resolved. @@ -100,11 +103,11 @@ class ClusterTaskManager { std::shared_ptr worker, std::unordered_map> &leased_workers_, const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback); + std::function send_reply_callback); void Spillback(ClientID spillback_to, std::string address, int port, rpc::RequestWorkerLeaseReply *reply, - rpc::SendReplyCallback send_reply_callback); + std::function send_reply_callback); }; } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc new file mode 100644 index 000000000..290cd7fa3 --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -0,0 +1,343 @@ +// Copyright 2017 The Ray Authors. +// + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/scheduling/cluster_task_manager.h" + +#include +#include + +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "ray/common/id.h" +#include "ray/common/task/scheduling_resources.h" +#include "ray/common/task/task.h" +#include "ray/common/task/task_util.h" +#include "ray/common/test_util.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/raylet/scheduling/scheduling_ids.h" + +#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION +#include + +#include "absl/container/flat_hash_map.h" +#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION + +namespace ray { + +namespace raylet { + +class MockWorkerPool : public WorkerPoolInterface { + public: + std::shared_ptr PopWorker(const TaskSpecification &task_spec) { + if (workers.empty()) { + return nullptr; + } + auto worker_ptr = workers.front(); + workers.pop_front(); + return worker_ptr; + } + + void PushWorker(const std::shared_ptr &worker) { + workers.push_front(worker); + } + + std::list> workers; +}; + +class MockWorker : public WorkerInterface { + public: + MockWorker(WorkerID worker_id, int port) : worker_id_(worker_id), port_(port) {} + + WorkerID WorkerId() const { return worker_id_; } + + int Port() const { return port_; } + + void SetOwnerAddress(const rpc::Address &address) { address_ = address; } + + void AssignTaskId(const TaskID &task_id) {} + + void AssignJobId(const JobID &job_id) {} + + void SetAssignedTask(Task &assigned_task) {} + + const std::string IpAddress() const { return address_.ip_address(); } + + void SetAllocatedInstances( + std::shared_ptr &allocated_instances) { + allocated_instances_ = allocated_instances; + } + + void SetLifetimeAllocatedInstances( + std::shared_ptr &allocated_instances) { + lifetime_allocated_instances_ = allocated_instances; + } + + std::shared_ptr GetAllocatedInstances() { + return allocated_instances_; + } + std::shared_ptr GetLifetimeAllocatedInstances() { + return lifetime_allocated_instances_; + } + + void MarkDead() { RAY_CHECK(false) << "Method unused"; } + bool IsDead() const { + RAY_CHECK(false) << "Method unused"; + return false; + } + void MarkBlocked() { RAY_CHECK(false) << "Method unused"; } + void MarkUnblocked() { RAY_CHECK(false) << "Method unused"; } + bool IsBlocked() const { + RAY_CHECK(false) << "Method unused"; + return false; + } + + Process GetProcess() const { + RAY_CHECK(false) << "Method unused"; + return Process::CreateNewDummy(); + } + void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; } + Language GetLanguage() const { + RAY_CHECK(false) << "Method unused"; + return Language::PYTHON; + } + + void Connect(int port) { RAY_CHECK(false) << "Method unused"; } + + int AssignedPort() const { + RAY_CHECK(false) << "Method unused"; + return -1; + } + void SetAssignedPort(int port) { RAY_CHECK(false) << "Method unused"; } + const TaskID &GetAssignedTaskId() const { + RAY_CHECK(false) << "Method unused"; + return TaskID::Nil(); + } + bool AddBlockedTaskId(const TaskID &task_id) { + RAY_CHECK(false) << "Method unused"; + return false; + } + bool RemoveBlockedTaskId(const TaskID &task_id) { + RAY_CHECK(false) << "Method unused"; + return false; + } + const std::unordered_set &GetBlockedTaskIds() const { + RAY_CHECK(false) << "Method unused"; + auto *t = new std::unordered_set(); + return *t; + } + const JobID &GetAssignedJobId() const { + RAY_CHECK(false) << "Method unused"; + return JobID::Nil(); + } + void AssignActorId(const ActorID &actor_id) { RAY_CHECK(false) << "Method unused"; } + const ActorID &GetActorId() const { + RAY_CHECK(false) << "Method unused"; + return ActorID::Nil(); + } + void MarkDetachedActor() { RAY_CHECK(false) << "Method unused"; } + bool IsDetachedActor() const { + RAY_CHECK(false) << "Method unused"; + return false; + } + const std::shared_ptr Connection() const { + RAY_CHECK(false) << "Method unused"; + return nullptr; + } + const rpc::Address &GetOwnerAddress() const { + RAY_CHECK(false) << "Method unused"; + return address_; + } + + const ResourceIdSet &GetLifetimeResourceIds() const { + RAY_CHECK(false) << "Method unused"; + auto *t = new ResourceIdSet(); + return *t; + } + void SetLifetimeResourceIds(ResourceIdSet &resource_ids) { + RAY_CHECK(false) << "Method unused"; + } + void ResetLifetimeResourceIds() { RAY_CHECK(false) << "Method unused"; } + + const ResourceIdSet &GetTaskResourceIds() const { + RAY_CHECK(false) << "Method unused"; + auto *t = new ResourceIdSet(); + return *t; + } + void SetTaskResourceIds(ResourceIdSet &resource_ids) { + RAY_CHECK(false) << "Method unused"; + } + void ResetTaskResourceIds() { RAY_CHECK(false) << "Method unused"; } + ResourceIdSet ReleaseTaskCpuResources() { + RAY_CHECK(false) << "Method unused"; + auto *t = new ResourceIdSet(); + return *t; + } + void AcquireTaskCpuResources(const ResourceIdSet &cpu_resources) { + RAY_CHECK(false) << "Method unused"; + } + + Status AssignTask(const Task &task, const ResourceIdSet &resource_id_set) { + RAY_CHECK(false) << "Method unused"; + Status s; + return s; + } + void DirectActorCallArgWaitComplete(int64_t tag) { + RAY_CHECK(false) << "Method unused"; + } + + void ClearAllocatedInstances() { RAY_CHECK(false) << "Method unused"; } + + void ClearLifetimeAllocatedInstances() { RAY_CHECK(false) << "Method unused"; } + + void SetBorrowedCPUInstances(std::vector &cpu_instances) { + RAY_CHECK(false) << "Method unused"; + } + + std::vector &GetBorrowedCPUInstances() { + RAY_CHECK(false) << "Method unused"; + auto *t = new std::vector(); + return *t; + } + + void ClearBorrowedCPUInstances() { RAY_CHECK(false) << "Method unused"; } + + Task &GetAssignedTask() { + RAY_CHECK(false) << "Method unused"; + auto *t = new Task(); + return *t; + } + + bool IsRegistered() { + RAY_CHECK(false) << "Method unused"; + return false; + } + + rpc::CoreWorkerClient *rpc_client() { + RAY_CHECK(false) << "Method unused"; + return nullptr; + } + + private: + WorkerID worker_id_; + int port_; + rpc::Address address_; + std::shared_ptr allocated_instances_; + std::shared_ptr lifetime_allocated_instances_; +}; + +std::shared_ptr CreateSingleNodeScheduler( + const std::string &id) { + std::unordered_map local_node_resources; + local_node_resources[ray::kCPU_ResourceLabel] = 8; + local_node_resources[ray::kGPU_ResourceLabel] = 4; + local_node_resources[ray::kMemory_ResourceLabel] = 128; + + auto scheduler = std::make_shared( + ClusterResourceScheduler(id, local_node_resources)); + + return scheduler; +} + +Task CreateTask(const std::unordered_map &required_resources) { + TaskSpecBuilder spec_builder; + TaskID id = RandomTaskId(); + JobID job_id = RandomJobId(); + rpc::Address address; + spec_builder.SetCommonTaskSpec( + id, Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), + job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, required_resources, {}); + rpc::TaskExecutionSpec execution_spec_message; + execution_spec_message.set_num_forwards(1); + return Task(spec_builder.Build(), TaskExecutionSpecification(execution_spec_message)); +} + +class ClusterTaskManagerTest : public ::testing::Test { + public: + ClusterTaskManagerTest() + : id_(ClientID::FromRandom()), + single_node_resource_scheduler_(CreateSingleNodeScheduler(id_.Binary())), + fulfills_dependencies_calls_(0), + dependencies_fulfilled_(true), + node_info_calls_(0), + node_info_(boost::optional{}), + task_manager_(id_, single_node_resource_scheduler_, + [this](const Task &_task) { + fulfills_dependencies_calls_++; + return dependencies_fulfilled_; + }, + [this](const ClientID &node_id) { + node_info_calls_++; + return node_info_; + }) {} + + void SetUp() {} + + void Shutdown() {} + + ClientID id_; + std::shared_ptr single_node_resource_scheduler_; + MockWorkerPool pool_; + std::unordered_map> leased_workers_; + + int fulfills_dependencies_calls_; + bool dependencies_fulfilled_; + + int node_info_calls_; + boost::optional node_info_; + + ClusterTaskManager task_manager_; +}; + +TEST_F(ClusterTaskManagerTest, BasicTest) { + /* + Test basic scheduler functionality: + 1. Queue and attempt to schedule/dispatch atest with no workers available + 2. A worker becomes available, dispatch again. + */ + Task task = CreateTask({{ray::kCPU_ResourceLabel, 4}}); + rpc::RequestWorkerLeaseReply reply; + bool callback_occurred = false; + bool *callback_occurred_ptr = &callback_occurred; + auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; + + task_manager_.QueueTask(task, &reply, callback); + task_manager_.SchedulePendingTasks(); + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + + ASSERT_FALSE(callback_occurred); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 0); + + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + + task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + + ASSERT_EQ(callback_occurred, true); + ASSERT_EQ(leased_workers_.size(), 1); + ASSERT_EQ(pool_.workers.size(), 0); + ASSERT_EQ(fulfills_dependencies_calls_, 0); + ASSERT_EQ(node_info_calls_, 0); +} + +} // namespace raylet + +} // namespace ray + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}