[New scheduler] Fix test_global_state (#12586)

This commit is contained in:
Alex Wu
2020-12-11 21:47:01 -08:00
committed by GitHub
parent 03d869d51c
commit aa64cd4534
8 changed files with 208 additions and 16 deletions
+4 -3
View File
@@ -144,7 +144,6 @@ def test_global_state_actor_entry(ray_start_regular):
@pytest.mark.parametrize("max_shapes", [0, 2, -1])
@pytest.mark.skipif(new_scheduler_enabled(), reason="broken")
def test_load_report(shutdown_only, max_shapes):
resource1 = "A"
resource2 = "B"
@@ -196,6 +195,8 @@ def test_load_report(shutdown_only, max_shapes):
if max_shapes != -1:
assert len(checker.report) <= max_shapes
print(checker.report)
if max_shapes > 0:
# Check that we always include the 1-CPU resource shape.
one_cpu_shape = {"CPU": 1}
@@ -216,7 +217,8 @@ def test_load_report(shutdown_only, max_shapes):
global_state_accessor.disconnect()
@pytest.mark.skipif(new_scheduler_enabled(), reason="broken")
@pytest.mark.skipif(
new_scheduler_enabled(), reason="requires placement groups")
def test_placement_group_load_report(ray_start_cluster):
cluster = ray_start_cluster
# Add a head node that doesn't have gpu resource.
@@ -285,7 +287,6 @@ def test_placement_group_load_report(ray_start_cluster):
global_state_accessor.disconnect()
@pytest.mark.skipif(new_scheduler_enabled(), reason="broken")
def test_backlog_report(shutdown_only):
cluster = ray.init(
num_cpus=1, _system_config={
+2
View File
@@ -35,6 +35,8 @@ void Task::CopyTaskExecutionSpec(const Task &task) {
task_execution_spec_ = task.task_execution_spec_;
}
void Task::SetBacklogSize(int64_t backlog_size) { backlog_size_ = backlog_size; }
int64_t Task::BacklogSize() const { return backlog_size_; }
std::string Task::DebugString() const {
+2
View File
@@ -89,6 +89,8 @@ class Task {
/// Returns the cancellation task callback, or nullptr.
const CancelTaskCallback &OnCancellation() const { return on_cancellation_; }
void SetBacklogSize(int64_t backlog_size);
int64_t BacklogSize() const;
std::string DebugString() const;
@@ -76,6 +76,15 @@ bool ClusterResourceScheduler::RemoveNode(const std::string &node_id_string) {
return RemoveNode(node_id);
}
bool ClusterResourceScheduler::IsLocallyFeasible(
const std::unordered_map<std::string, double> shape) {
const TaskRequest task_req = ResourceMapToTaskRequest(string_to_int_map_, shape);
RAY_CHECK(nodes_.contains(local_node_id_));
const auto &it = nodes_.find(local_node_id_);
RAY_CHECK(it != nodes_.end());
return IsFeasible(task_req, it->second.GetLocalView());
}
bool ClusterResourceScheduler::IsFeasible(const TaskRequest &task_req,
const NodeResources &resources) const {
// First, check predefined resources.
@@ -73,6 +73,13 @@ class ClusterResourceScheduler {
bool RemoveNode(int64_t node_id);
bool RemoveNode(const std::string &node_id_string);
/// Check whether a task request is feasible on a given node. A node is
/// feasible if it has the total resources needed to eventually execute the
/// task, even if those resources are currently allocated.
///
/// \param shape The resource demand's shape.
bool IsLocallyFeasible(const std::unordered_map<std::string, double> shape);
/// Check whether a task request is feasible on a given node. A node is
/// feasible if it has the total resources needed to eventually execute the
/// task, even if those resources are currently allocated.
+117 -12
View File
@@ -17,7 +17,10 @@ ClusterTaskManager::ClusterTaskManager(
cluster_resource_scheduler_(cluster_resource_scheduler),
fulfills_dependencies_func_(fulfills_dependencies_func),
is_owner_alive_(is_owner_alive),
get_node_info_(get_node_info) {}
get_node_info_(get_node_info),
max_resource_shapes_per_load_report_(
RayConfig::instance().max_resource_shapes_per_load_report()),
report_worker_backlog_(RayConfig::instance().report_worker_backlog()) {}
bool ClusterTaskManager::SchedulePendingTasks() {
bool did_schedule = false;
@@ -134,7 +137,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
if (worker_leased) {
auto reply = std::get<1>(*work_it);
auto callback = std::get<2>(*work_it);
Dispatch(worker, leased_workers, spec, reply, callback);
Dispatch(worker, leased_workers, task, reply, callback);
} else {
worker_pool.PushWorker(worker);
}
@@ -199,6 +202,7 @@ void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseRepl
Work work = std::make_tuple(task, reply, callback);
const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass();
tasks_to_schedule_[scheduling_class].push_back(work);
AddToBacklogTracker(task);
}
void ClusterTaskManager::TasksUnblocked(const std::vector<TaskID> ready_ids) {
@@ -236,7 +240,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
shapes_it++) {
auto &work_queue = shapes_it->second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) {
const auto &task = std::get<0>(*work_it);
if (task.GetTaskSpecification().TaskId() == task_id) {
RemoveFromBacklogTracker(task);
RAY_LOG(DEBUG) << "Canceling task " << task_id;
ReplyCancelled(*work_it);
work_queue.erase(work_it);
@@ -251,7 +257,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
shapes_it++) {
auto &work_queue = shapes_it->second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
if (std::get<0>(*work_it).GetTaskSpecification().TaskId() == task_id) {
const auto &task = std::get<0>(*work_it);
if (task.GetTaskSpecification().TaskId() == task_id) {
RemoveFromBacklogTracker(task);
ReplyCancelled(*work_it);
work_queue.erase(work_it);
if (work_queue.empty()) {
@@ -264,6 +272,8 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
auto iter = waiting_tasks_.find(task_id);
if (iter != waiting_tasks_.end()) {
const auto &task = std::get<0>(iter->second);
RemoveFromBacklogTracker(task);
ReplyCancelled(iter->second);
waiting_tasks_.erase(iter);
return true;
@@ -275,6 +285,9 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
void ClusterTaskManager::FillResourceUsage(
bool light_report_resource_usage_enabled,
std::shared_ptr<rpc::ResourcesData> data) const {
if (max_resource_shapes_per_load_report_ == 0) {
return;
}
// TODO (WangTao): Find a way to check if load changed and combine it with light
// heartbeat. Now we just report it every time.
data->set_resource_load_changed(true);
@@ -282,9 +295,59 @@ void ClusterTaskManager::FillResourceUsage(
auto resource_load_by_shape =
data->mutable_resource_load_by_shape()->mutable_resource_demands();
// TODO (Alex): Implement the 1-CPU task optimization.
int num_reported = 0;
// 1-CPU optimization
static const ResourceSet one_cpu_resource_set(
std::unordered_map<std::string, double>({{kCPU_ResourceLabel, 1}}));
static const SchedulingClass one_cpu_scheduling_cls(
TaskSpecification::GetSchedulingClass(one_cpu_resource_set));
{
num_reported++;
int count = 0;
auto it = tasks_to_schedule_.find(one_cpu_scheduling_cls);
if (it != tasks_to_schedule_.end()) {
count += it->second.size();
}
it = tasks_to_dispatch_.find(one_cpu_scheduling_cls);
if (it != tasks_to_dispatch_.end()) {
count += it->second.size();
}
if (count > 0) {
auto by_shape_entry = resource_load_by_shape->Add();
for (const auto &resource : one_cpu_resource_set.GetResourceMap()) {
// Add to `resource_loads`.
const auto &label = resource.first;
const auto &quantity = resource.second;
(*resource_loads)[label] += quantity * count;
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
}
int num_ready = by_shape_entry->num_ready_requests_queued();
by_shape_entry->set_num_ready_requests_queued(num_ready + count);
auto backlog_it = backlog_tracker_.find(one_cpu_scheduling_cls);
if (backlog_it != backlog_tracker_.end()) {
by_shape_entry->set_backlog_size(backlog_it->second);
}
}
}
for (const auto &pair : tasks_to_schedule_) {
const auto &scheduling_class = pair.first;
if (scheduling_class == one_cpu_scheduling_cls) {
continue;
}
if (num_reported++ >= max_resource_shapes_per_load_report_ &&
max_resource_shapes_per_load_report_ >= 0) {
// TODO (Alex): It's possible that we skip a different scheduling key which contains
// the same resources.
break;
}
const auto &resources =
TaskSpecification::GetSchedulingClassDescriptor(scheduling_class)
.GetResourceMap();
@@ -301,14 +364,35 @@ void ClusterTaskManager::FillResourceUsage(
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_infeasible_requests_queued(count);
}
// If a task is not feasible on the local node it will not be feasible on any other
// node in the cluster. See the scheduling policy defined by
// ClusterResourceScheduler::GetBestSchedulableNode for more details.
if (cluster_resource_scheduler_->IsLocallyFeasible(resources)) {
int num_ready = by_shape_entry->num_ready_requests_queued();
by_shape_entry->set_num_ready_requests_queued(num_ready + count);
} else {
int num_infeasible = by_shape_entry->num_infeasible_requests_queued();
by_shape_entry->set_num_infeasible_requests_queued(num_infeasible + count);
}
auto backlog_it = backlog_tracker_.find(scheduling_class);
if (backlog_it != backlog_tracker_.end()) {
by_shape_entry->set_backlog_size(backlog_it->second);
}
}
for (const auto &pair : tasks_to_dispatch_) {
const auto &scheduling_class = pair.first;
if (scheduling_class == one_cpu_scheduling_cls) {
continue;
}
if (num_reported++ >= max_resource_shapes_per_load_report_ &&
max_resource_shapes_per_load_report_ >= 0) {
// TODO (Alex): It's possible that we skip a different scheduling key which contains
// the same resources.
break;
}
const auto &resources =
TaskSpecification::GetSchedulingClassDescriptor(scheduling_class)
.GetResourceMap();
@@ -325,9 +409,12 @@ void ClusterTaskManager::FillResourceUsage(
// Add to `resource_load_by_shape`.
(*by_shape_entry->mutable_shape())[label] = quantity;
// TODO (Alex): Technically being on `tasks_to_schedule` could also mean
// that the entire cluster is utilized.
by_shape_entry->set_num_ready_requests_queued(count);
}
int num_ready = by_shape_entry->num_ready_requests_queued();
by_shape_entry->set_num_ready_requests_queued(num_ready + count);
auto backlog_it = backlog_tracker_.find(scheduling_class);
if (backlog_it != backlog_tracker_.end()) {
by_shape_entry->set_backlog_size(backlog_it->second);
}
}
}
@@ -347,8 +434,9 @@ std::string ClusterTaskManager::DebugString() const {
void ClusterTaskManager::Dispatch(
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply,
const Task &task, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback) {
const auto &task_spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId();
// Pass the contact info of the worker to use.
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
@@ -433,5 +521,22 @@ void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work)
send_reply_callback();
}
void ClusterTaskManager::AddToBacklogTracker(const Task &task) {
if (report_worker_backlog_) {
auto cls = task.GetTaskSpecification().GetSchedulingClass();
backlog_tracker_[cls] += task.BacklogSize();
}
}
void ClusterTaskManager::RemoveFromBacklogTracker(const Task &task) {
if (report_worker_backlog_) {
SchedulingClass cls = task.GetTaskSpecification().GetSchedulingClass();
backlog_tracker_[cls] -= task.BacklogSize();
if (backlog_tracker_[cls] == 0) {
backlog_tracker_.erase(backlog_tracker_.find(cls));
}
}
}
} // namespace raylet
} // namespace ray
@@ -128,6 +128,9 @@ class ClusterTaskManager {
std::function<bool(const WorkerID &, const NodeID &)> is_owner_alive_;
NodeInfoGetter get_node_info_;
const int max_resource_shapes_per_load_report_;
const bool report_worker_backlog_;
/// Queue of lease requests that are waiting for resources to become available.
/// Tasks move from scheduled -> dispatch | waiting.
std::unordered_map<SchedulingClass, std::deque<Work>> tasks_to_schedule_;
@@ -140,6 +143,9 @@ class ClusterTaskManager {
/// Tasks move from waiting -> dispatch.
absl::flat_hash_map<TaskID, Work> waiting_tasks_;
/// Track the cumulative backlog of all workers requesting a lease to this raylet.
std::unordered_map<SchedulingClass, int> backlog_tracker_;
/// Determine whether a task should be immediately dispatched,
/// or placed on a wait queue.
///
@@ -149,10 +155,13 @@ class ClusterTaskManager {
void Dispatch(
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_,
const TaskSpecification &task_spec, rpc::RequestWorkerLeaseReply *reply,
const Task &task, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback);
void Spillback(const NodeID &spillback_to, const Work &work);
void AddToBacklogTracker(const Task &task);
void RemoveFromBacklogTracker(const Task &task);
};
} // namespace raylet
} // namespace ray
@@ -480,6 +480,63 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) {
}
}
TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
/*
Test basic scheduler functionality:
1. Queue and attempt to schedule/dispatch atest with no workers available
2. A worker becomes available, dispatch again.
*/
rpc::RequestWorkerLeaseReply reply;
bool callback_occurred = false;
bool *callback_occurred_ptr = &callback_occurred;
auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; };
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));
std::vector<TaskID> to_cancel;
for (int i = 0; i < 10; i++) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}});
task.SetBacklogSize(i);
task_manager_.QueueTask(task, &reply, callback);
to_cancel.push_back(task.GetTaskSpecification().TaskId());
}
task_manager_.SchedulePendingTasks();
task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_);
ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0);
ASSERT_EQ(pool_.workers.size(), 1);
ASSERT_EQ(fulfills_dependencies_calls_, 0);
ASSERT_EQ(node_info_calls_, 0);
auto data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(false, data);
auto resource_load_by_shape = data->resource_load_by_shape();
auto shape1 = resource_load_by_shape.resource_demands()[0];
ASSERT_EQ(shape1.backlog_size(), 45);
ASSERT_EQ(shape1.num_infeasible_requests_queued(), 10);
ASSERT_EQ(shape1.num_ready_requests_queued(), 0);
for (auto &task_id : to_cancel) {
ASSERT_TRUE(task_manager_.CancelTask(task_id));
}
data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(false, data);
resource_load_by_shape = data->resource_load_by_shape();
shape1 = resource_load_by_shape.resource_demands()[0];
ASSERT_EQ(shape1.backlog_size(), 0);
ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0);
ASSERT_EQ(shape1.num_ready_requests_queued(), 0);
}
TEST_F(ClusterTaskManagerTest, OwnerDeadTest) {
/*
Test the race condition in which the owner of a task dies while the task is pending.