[New scheduler] First unit test for task manager (#9696)

* .

* .

* refactor WorkerInterface

* .

* Basic unit test structure complete?

* .

* bad git >:-(

* small clean up

* CR

* .

* .

* One more fixture

* One more fixture

* .

* .

* bazel-format

* .
This commit is contained in:
Alex Wu
2020-07-28 09:44:58 -07:00
committed by GitHub
parent b1c2983c97
commit feb3751824
8 changed files with 407 additions and 34 deletions
+18 -4
View File
@@ -596,7 +596,7 @@ cc_library(
"src/ray/raylet/**/*.cc",
],
exclude = [
"src/ray/raylet/*_test.cc",
"src/ray/raylet/**/*_test.cc",
"src/ray/raylet/main.cc",
],
),
@@ -680,7 +680,7 @@ cc_library(
"src/ray/rpc/worker/*.cc",
],
exclude = [
"src/ray/core_worker/*_test.cc",
"src/ray/core_worker/**/*_test.cc",
"src/ray/core_worker/mock_worker.cc",
],
),
@@ -831,8 +831,22 @@ cc_test(
)
cc_test(
name = "scheduling_test",
srcs = ["src/ray/raylet/scheduling/scheduling_test.cc"],
name = "cluster_resource_scheduler_test",
srcs = [
"src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc",
],
copts = COPTS,
deps = [
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "cluster_task_manager_test",
srcs = [
"src/ray/raylet/scheduling/cluster_task_manager_test.cc",
],
copts = COPTS,
deps = [
":raylet_lib",
+6
View File
@@ -204,6 +204,12 @@ TaskID RandomTaskId() {
return TaskID::FromBinary(data);
}
JobID RandomJobId() {
std::string data(JobID::Size(), 0);
FillRandom(&data);
return JobID::FromBinary(data);
}
std::shared_ptr<Buffer> GenerateRandomBuffer() {
auto seed = std::chrono::high_resolution_clock::now().time_since_epoch().count();
std::mt19937 gen(seed);
+3
View File
@@ -60,6 +60,9 @@ int KillAllExecutable(const std::string &executable_with_suffix);
// A helper function to return a random task id.
TaskID RandomTaskId();
// A helper function to return a random job id.
JobID RandomJobId();
std::shared_ptr<Buffer> GenerateRandomBuffer();
std::shared_ptr<RayObject> GenerateRandomObject(
+3 -1
View File
@@ -1730,7 +1730,9 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest
if (new_scheduler_enabled_) {
auto task_spec = task.GetTaskSpecification();
cluster_task_manager_->QueueTask(task, reply, send_reply_callback);
cluster_task_manager_->QueueTask(task, reply, [send_reply_callback]() {
send_reply_callback(Status::OK(), nullptr, nullptr);
});
ScheduleAndDispatch();
return;
}
@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include <string>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
@@ -173,14 +174,14 @@ bool nodeResourcesEqual(const NodeResources &nr1, const NodeResources &nr2) {
namespace ray {
class SchedulingTest : public ::testing::Test {
class ClusterResourceSchedulerTest : public ::testing::Test {
public:
void SetUp() {}
void Shutdown() {}
};
TEST_F(SchedulingTest, SchedulingFixedPointTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingFixedPointTest) {
{
FixedPoint fp(1.);
FixedPoint fp1(1.);
@@ -225,7 +226,7 @@ TEST_F(SchedulingTest, SchedulingFixedPointTest) {
}
}
TEST_F(SchedulingTest, SchedulingIdTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingIdTest) {
StringIdMap ids;
hash<string> hasher;
size_t num = 10; // should be greater than 10.
@@ -241,7 +242,7 @@ TEST_F(SchedulingTest, SchedulingIdTest) {
ids.Remove(hasher(to_string(2)));
ASSERT_EQ(ids.Count(), num - 2);
ASSERT_TRUE(ids.Get(to_string(3)) == static_cast<int64_t>(hasher(to_string(3))));
ASSERT_EQ(ids.Get(to_string(3)), static_cast<int64_t>(hasher(to_string(3))));
ASSERT_TRUE(ids.Get(to_string(100)) == -1);
@@ -255,7 +256,7 @@ TEST_F(SchedulingTest, SchedulingIdTest) {
ASSERT_EQ(short_ids.Count(), max_id);
}
TEST_F(SchedulingTest, SchedulingInitClusterTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingInitClusterTest) {
int num_nodes = 10;
ClusterResourceScheduler cluster_resources;
@@ -264,7 +265,7 @@ TEST_F(SchedulingTest, SchedulingInitClusterTest) {
ASSERT_EQ(cluster_resources.NumNodes(), num_nodes);
}
TEST_F(SchedulingTest, SchedulingDeleteClusterNodeTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingDeleteClusterNodeTest) {
int num_nodes = 4;
int64_t remove_id = 2;
@@ -276,7 +277,7 @@ TEST_F(SchedulingTest, SchedulingDeleteClusterNodeTest) {
ASSERT_TRUE(num_nodes - 1 == cluster_resources.NumNodes());
}
TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingModifyClusterNodeTest) {
int num_nodes = 4;
int64_t update_id = 2;
ClusterResourceScheduler cluster_resources;
@@ -310,7 +311,7 @@ TEST_F(SchedulingTest, SchedulingModifyClusterNodeTest) {
ASSERT_TRUE(num_nodes == cluster_resources.NumNodes());
}
TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) {
// Create cluster resources.
NodeResources node_resources;
vector<FixedPoint> pred_capacities{10, 5, 3};
@@ -360,7 +361,7 @@ TEST_F(SchedulingTest, SchedulingUpdateAvailableResourcesTest) {
}
}
TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest) {
ClusterResourceScheduler cluster_resources;
NodeResources nr, nr_out;
int64_t node_id = 1;
@@ -400,7 +401,7 @@ TEST_F(SchedulingTest, SchedulingAddOrUpdateNodeTest) {
}
}
TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
TEST_F(ClusterResourceSchedulerTest, SchedulingTaskRequestTest) {
// Create cluster resources containing local node.
NodeResources node_resources;
vector<FixedPoint> pred_capacities{5, 5};
@@ -562,7 +563,7 @@ TEST_F(SchedulingTest, SchedulingTaskRequestTest) {
}
}
TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) {
TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) {
// Create cluster resources containing local node.
NodeResources node_resources;
vector<FixedPoint> pred_capacities{3 /* CPU */, 4 /* MEM */, 5 /* GPU */};
@@ -586,7 +587,7 @@ TEST_F(SchedulingTest, GetLocalAvailableResourcesTest) {
ASSERT_EQ(expected_cluster_resources == available_cluster_resources, true);
}
TEST_F(SchedulingTest, GetCPUInstancesDoubleTest) {
TEST_F(ClusterResourceSchedulerTest, GetCPUInstancesDoubleTest) {
TaskResourceInstances task_resources;
addTaskResourceInstances(true, {1., 1., 1.}, CPU, &task_resources);
addTaskResourceInstances(true, {4.}, MEM, &task_resources);
@@ -598,7 +599,7 @@ TEST_F(SchedulingTest, GetCPUInstancesDoubleTest) {
ASSERT_EQ(EqualVectors(cpu_instances, expected_cpu_instances), true);
}
TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) {
TEST_F(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest) {
NodeResources node_resources;
vector<FixedPoint> pred_capacities{3 /* CPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
@@ -629,7 +630,7 @@ TEST_F(SchedulingTest, AvailableResourceInstancesOpsTest) {
ASSERT_EQ(EqualVectors(instances.available, expected_available), true);
}
TEST_F(SchedulingTest, TaskResourceInstancesTest) {
TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
// Allocate resources for a task request specifying only predefined resources.
{
NodeResources node_resources;
@@ -810,7 +811,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest) {
}
}
TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) {
{
NodeResources node_resources;
vector<FixedPoint> pred_capacities{4. /* CPU */, 4. /* MEM */, 5. /* GPU */};
@@ -842,7 +843,7 @@ TEST_F(SchedulingTest, TaskResourceInstancesTest2) {
}
}
TEST_F(SchedulingTest, TaskGPUResourceInstancesTest) {
TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
{
NodeResources node_resources;
vector<FixedPoint> pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */};
@@ -895,7 +896,8 @@ TEST_F(SchedulingTest, TaskGPUResourceInstancesTest) {
}
}
TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
TEST_F(ClusterResourceSchedulerTest,
UpdateLocalAvailableResourcesFromResourceInstancesTest) {
{
NodeResources node_resources;
vector<FixedPoint> pred_capacities{1 /* CPU */, 1 /* MEM */, 4 /* GPU */};
@@ -942,7 +944,7 @@ TEST_F(SchedulingTest, UpdateLocalAvailableResourcesFromResourceInstancesTest) {
}
}
TEST_F(SchedulingTest, TaskResourceInstanceWithHardRequestTest) {
TEST_F(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest) {
NodeResources node_resources;
vector<FixedPoint> pred_capacities{4. /* CPU */, 2. /* MEM */, 4. /* GPU */};
initNodeResources(node_resources, pred_capacities, EmptyIntVector,
@@ -132,8 +132,8 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
}
void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Work work = std::make_tuple(task, reply, send_reply_callback);
std::function<void(void)> callback) {
Work work = std::make_tuple(task, reply, callback);
tasks_to_schedule_.push_back(work);
}
@@ -151,7 +151,7 @@ void ClusterTaskManager::Dispatch(
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_,
const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::function<void(void)> send_reply_callback) {
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
reply->mutable_worker_address()->set_port(worker->Port());
reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());
@@ -202,16 +202,16 @@ void ClusterTaskManager::Dispatch(
}
}
}
send_reply_callback(Status::OK(), nullptr, nullptr);
send_reply_callback();
}
void ClusterTaskManager::Spillback(ClientID spillback_to, std::string address, int port,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback) {
std::function<void(void)> send_reply_callback) {
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary());
send_reply_callback(Status::OK(), nullptr, nullptr);
send_reply_callback();
}
} // namespace raylet
@@ -12,7 +12,10 @@
namespace ray {
namespace raylet {
typedef std::tuple<Task, rpc::RequestWorkerLeaseReply *, rpc::SendReplyCallback> Work;
/// Work represents all the information needed to make a scheduling decision.
/// This includes the task, the information we need to communicate to
/// dispatch/spillback and the callback to trigger it.
typedef std::tuple<Task, rpc::RequestWorkerLeaseReply *, std::function<void(void)>> Work;
typedef std::function<boost::optional<rpc::GcsNodeInfo>(const ClientID &node_id)>
NodeInfoGetter;
@@ -68,7 +71,7 @@ class ClusterTaskManager {
/// \param fn: The function used during dispatching.
/// \param task: The incoming task to schedule.
void QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback);
std::function<void(void)>);
/// Move tasks from waiting to ready for dispatch. Called when a task's
/// dependencies are resolved.
@@ -100,11 +103,11 @@ class ClusterTaskManager {
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_,
const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback);
std::function<void(void)> send_reply_callback);
void Spillback(ClientID spillback_to, std::string address, int port,
rpc::RequestWorkerLeaseReply *reply,
rpc::SendReplyCallback send_reply_callback);
std::function<void(void)> send_reply_callback);
};
} // namespace raylet
} // namespace ray
@@ -0,0 +1,343 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include <memory>
#include <string>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/common/id.h"
#include "ray/common/task/scheduling_resources.h"
#include "ray/common/task/task.h"
#include "ray/common/task/task_util.h"
#include "ray/common/test_util.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/scheduling_ids.h"
#ifdef UNORDERED_VS_ABSL_MAPS_EVALUATION
#include <chrono>
#include "absl/container/flat_hash_map.h"
#endif // UNORDERED_VS_ABSL_MAPS_EVALUATION
namespace ray {
namespace raylet {
class MockWorkerPool : public WorkerPoolInterface {
public:
std::shared_ptr<WorkerInterface> PopWorker(const TaskSpecification &task_spec) {
if (workers.empty()) {
return nullptr;
}
auto worker_ptr = workers.front();
workers.pop_front();
return worker_ptr;
}
void PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
workers.push_front(worker);
}
std::list<std::shared_ptr<WorkerInterface>> workers;
};
class MockWorker : public WorkerInterface {
public:
MockWorker(WorkerID worker_id, int port) : worker_id_(worker_id), port_(port) {}
WorkerID WorkerId() const { return worker_id_; }
int Port() const { return port_; }
void SetOwnerAddress(const rpc::Address &address) { address_ = address; }
void AssignTaskId(const TaskID &task_id) {}
void AssignJobId(const JobID &job_id) {}
void SetAssignedTask(Task &assigned_task) {}
const std::string IpAddress() const { return address_.ip_address(); }
void SetAllocatedInstances(
std::shared_ptr<TaskResourceInstances> &allocated_instances) {
allocated_instances_ = allocated_instances;
}
void SetLifetimeAllocatedInstances(
std::shared_ptr<TaskResourceInstances> &allocated_instances) {
lifetime_allocated_instances_ = allocated_instances;
}
std::shared_ptr<TaskResourceInstances> GetAllocatedInstances() {
return allocated_instances_;
}
std::shared_ptr<TaskResourceInstances> GetLifetimeAllocatedInstances() {
return lifetime_allocated_instances_;
}
void MarkDead() { RAY_CHECK(false) << "Method unused"; }
bool IsDead() const {
RAY_CHECK(false) << "Method unused";
return false;
}
void MarkBlocked() { RAY_CHECK(false) << "Method unused"; }
void MarkUnblocked() { RAY_CHECK(false) << "Method unused"; }
bool IsBlocked() const {
RAY_CHECK(false) << "Method unused";
return false;
}
Process GetProcess() const {
RAY_CHECK(false) << "Method unused";
return Process::CreateNewDummy();
}
void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; }
Language GetLanguage() const {
RAY_CHECK(false) << "Method unused";
return Language::PYTHON;
}
void Connect(int port) { RAY_CHECK(false) << "Method unused"; }
int AssignedPort() const {
RAY_CHECK(false) << "Method unused";
return -1;
}
void SetAssignedPort(int port) { RAY_CHECK(false) << "Method unused"; }
const TaskID &GetAssignedTaskId() const {
RAY_CHECK(false) << "Method unused";
return TaskID::Nil();
}
bool AddBlockedTaskId(const TaskID &task_id) {
RAY_CHECK(false) << "Method unused";
return false;
}
bool RemoveBlockedTaskId(const TaskID &task_id) {
RAY_CHECK(false) << "Method unused";
return false;
}
const std::unordered_set<TaskID> &GetBlockedTaskIds() const {
RAY_CHECK(false) << "Method unused";
auto *t = new std::unordered_set<TaskID>();
return *t;
}
const JobID &GetAssignedJobId() const {
RAY_CHECK(false) << "Method unused";
return JobID::Nil();
}
void AssignActorId(const ActorID &actor_id) { RAY_CHECK(false) << "Method unused"; }
const ActorID &GetActorId() const {
RAY_CHECK(false) << "Method unused";
return ActorID::Nil();
}
void MarkDetachedActor() { RAY_CHECK(false) << "Method unused"; }
bool IsDetachedActor() const {
RAY_CHECK(false) << "Method unused";
return false;
}
const std::shared_ptr<ClientConnection> Connection() const {
RAY_CHECK(false) << "Method unused";
return nullptr;
}
const rpc::Address &GetOwnerAddress() const {
RAY_CHECK(false) << "Method unused";
return address_;
}
const ResourceIdSet &GetLifetimeResourceIds() const {
RAY_CHECK(false) << "Method unused";
auto *t = new ResourceIdSet();
return *t;
}
void SetLifetimeResourceIds(ResourceIdSet &resource_ids) {
RAY_CHECK(false) << "Method unused";
}
void ResetLifetimeResourceIds() { RAY_CHECK(false) << "Method unused"; }
const ResourceIdSet &GetTaskResourceIds() const {
RAY_CHECK(false) << "Method unused";
auto *t = new ResourceIdSet();
return *t;
}
void SetTaskResourceIds(ResourceIdSet &resource_ids) {
RAY_CHECK(false) << "Method unused";
}
void ResetTaskResourceIds() { RAY_CHECK(false) << "Method unused"; }
ResourceIdSet ReleaseTaskCpuResources() {
RAY_CHECK(false) << "Method unused";
auto *t = new ResourceIdSet();
return *t;
}
void AcquireTaskCpuResources(const ResourceIdSet &cpu_resources) {
RAY_CHECK(false) << "Method unused";
}
Status AssignTask(const Task &task, const ResourceIdSet &resource_id_set) {
RAY_CHECK(false) << "Method unused";
Status s;
return s;
}
void DirectActorCallArgWaitComplete(int64_t tag) {
RAY_CHECK(false) << "Method unused";
}
void ClearAllocatedInstances() { RAY_CHECK(false) << "Method unused"; }
void ClearLifetimeAllocatedInstances() { RAY_CHECK(false) << "Method unused"; }
void SetBorrowedCPUInstances(std::vector<double> &cpu_instances) {
RAY_CHECK(false) << "Method unused";
}
std::vector<double> &GetBorrowedCPUInstances() {
RAY_CHECK(false) << "Method unused";
auto *t = new std::vector<double>();
return *t;
}
void ClearBorrowedCPUInstances() { RAY_CHECK(false) << "Method unused"; }
Task &GetAssignedTask() {
RAY_CHECK(false) << "Method unused";
auto *t = new Task();
return *t;
}
bool IsRegistered() {
RAY_CHECK(false) << "Method unused";
return false;
}
rpc::CoreWorkerClient *rpc_client() {
RAY_CHECK(false) << "Method unused";
return nullptr;
}
private:
WorkerID worker_id_;
int port_;
rpc::Address address_;
std::shared_ptr<TaskResourceInstances> allocated_instances_;
std::shared_ptr<TaskResourceInstances> lifetime_allocated_instances_;
};
std::shared_ptr<ClusterResourceScheduler> CreateSingleNodeScheduler(
const std::string &id) {
std::unordered_map<std::string, double> local_node_resources;
local_node_resources[ray::kCPU_ResourceLabel] = 8;
local_node_resources[ray::kGPU_ResourceLabel] = 4;
local_node_resources[ray::kMemory_ResourceLabel] = 128;
auto scheduler = std::make_shared<ClusterResourceScheduler>(
ClusterResourceScheduler(id, local_node_resources));
return scheduler;
}
Task CreateTask(const std::unordered_map<std::string, double> &required_resources) {
TaskSpecBuilder spec_builder;
TaskID id = RandomTaskId();
JobID job_id = RandomJobId();
rpc::Address address;
spec_builder.SetCommonTaskSpec(
id, Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""),
job_id, TaskID::Nil(), 0, TaskID::Nil(), address, 0, required_resources, {});
rpc::TaskExecutionSpec execution_spec_message;
execution_spec_message.set_num_forwards(1);
return Task(spec_builder.Build(), TaskExecutionSpecification(execution_spec_message));
}
class ClusterTaskManagerTest : public ::testing::Test {
public:
ClusterTaskManagerTest()
: id_(ClientID::FromRandom()),
single_node_resource_scheduler_(CreateSingleNodeScheduler(id_.Binary())),
fulfills_dependencies_calls_(0),
dependencies_fulfilled_(true),
node_info_calls_(0),
node_info_(boost::optional<rpc::GcsNodeInfo>{}),
task_manager_(id_, single_node_resource_scheduler_,
[this](const Task &_task) {
fulfills_dependencies_calls_++;
return dependencies_fulfilled_;
},
[this](const ClientID &node_id) {
node_info_calls_++;
return node_info_;
}) {}
void SetUp() {}
void Shutdown() {}
ClientID id_;
std::shared_ptr<ClusterResourceScheduler> single_node_resource_scheduler_;
MockWorkerPool pool_;
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> leased_workers_;
int fulfills_dependencies_calls_;
bool dependencies_fulfilled_;
int node_info_calls_;
boost::optional<rpc::GcsNodeInfo> node_info_;
ClusterTaskManager task_manager_;
};
TEST_F(ClusterTaskManagerTest, BasicTest) {
/*
Test basic scheduler functionality:
1. Queue and attempt to schedule/dispatch atest with no workers available
2. A worker becomes available, dispatch again.
*/
Task task = CreateTask({{ray::kCPU_ResourceLabel, 4}});
rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; };
task_manager_.QueueTask(task, &reply, callback);
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0);
ASSERT_EQ(pool_.workers.size(), 0);
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_EQ(callback_occurred, true);
ASSERT_EQ(leased_workers_.size(), 1);
ASSERT_EQ(pool_.workers.size(), 0);
ASSERT_EQ(fulfills_dependencies_calls_, 0);
ASSERT_EQ(node_info_calls_, 0);
}
} // namespace raylet
} // namespace ray
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}