Revert "Enabling the cancellation of non-actor tasks in a worker's queue (#12117)" (#13178)

This reverts commit b4d688b4a6.
This commit is contained in:
Stephanie Wang
2021-01-04 17:27:48 -08:00
committed by GitHub
parent e181515dff
commit b765914a1b
6 changed files with 17 additions and 140 deletions
+3 -9
View File
@@ -760,7 +760,6 @@ void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) {
}
absl::MutexLock lock(&mutex_);
while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) {
auto &spec = to_resubmit_.front().second;
if (spec.IsActorTask()) {
@@ -2217,17 +2216,12 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request,
rpc::SendReplyCallback send_reply_callback) {
absl::MutexLock lock(&mutex_);
TaskID task_id = TaskID::FromBinary(request.intended_task_id());
bool requested_task_running = main_thread_task_id_ == task_id;
bool success = requested_task_running;
bool success = main_thread_task_id_ == task_id;
// Try non-force kill
if (requested_task_running && !request.force_kill()) {
if (success && !request.force_kill()) {
RAY_LOG(INFO) << "Interrupting a running task " << main_thread_task_id_;
success = options_.kill_main();
} else if (!requested_task_running) {
// If the task is not currently running, check if it is in the worker's queue of
// normal tasks, and remove it if found.
success = direct_task_receiver_->CancelQueuedNormalTask(task_id);
}
if (request.recursive()) {
auto recursive_cancel = CancelChildren(task_id, request.force_kill());
@@ -2240,7 +2234,7 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request,
send_reply_callback(Status::OK(), nullptr, nullptr);
// Do force kill after reply callback sent
if (requested_task_running && request.force_kill()) {
if (success && request.force_kill()) {
RAY_LOG(INFO) << "Force killing a worker running " << main_thread_task_id_;
Disconnect();
if (options_.enable_logging) {
@@ -841,48 +841,6 @@ TEST_F(SingleNodeTest, TestNormalTaskLocal) {
TestNormalTask(resources);
}
TEST_F(SingleNodeTest, TestCancelTasks) {
auto &driver = CoreWorkerProcess::GetCoreWorker();
// Create two functions, each implementing a while(true) loop.
RayFunction func1(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"WhileTrueLoop", "", "", ""));
RayFunction func2(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython(
"WhileTrueLoop", "", "", ""));
// Return IDs for the two functions that implement while(true) loops.
std::vector<ObjectID> return_ids1;
std::vector<ObjectID> return_ids2;
// Create default args and options needed to submit the tasks that encapsulate func1 and
// func2.
std::vector<std::unique_ptr<TaskArg>> args;
TaskOptions options;
// Submit func1. The function should start looping forever.
driver.SubmitTask(func1, args, options, &return_ids1, /*max_retries=*/0,
std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/"");
ASSERT_EQ(return_ids1.size(), 1);
// Submit func2. The function should be queued at the worker indefinitely.
driver.SubmitTask(func2, args, options, &return_ids2, /*max_retries=*/0,
std::make_pair(PlacementGroupID::Nil(), -1), true,
/*debugger_breakpoint=*/"");
ASSERT_EQ(return_ids2.size(), 1);
// Cancel func2 by removing it from the worker's queue
RAY_CHECK_OK(driver.CancelTask(return_ids2[0], true, false));
// Cancel func1, which is currently running.
RAY_CHECK_OK(driver.CancelTask(return_ids1[0], true, false));
// TestNormalTask will get stuck unless both func1 and func2 have been cancelled. Thus,
// if TestNormalTask succeeds, we know that func2 must have been removed from the
// worker's queue.
std::unordered_map<std::string, double> resources;
TestNormalTask(resources);
}
TEST_F(TwoNodeTest, TestNormalTaskCrossNodes) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
-10
View File
@@ -79,8 +79,6 @@ class MockWorker {
} else if ("MergeInputArgsAsOutput" == typed_descriptor->ModuleName()) {
// Merge input args and write the merged content to each of return ids
return MergeInputArgsAsOutput(args, return_ids, results);
} else if ("WhileTrueLoop" == typed_descriptor->ModuleName()) {
return WhileTrueLoop(args, return_ids, results);
} else {
return Status::TypeError("Unknown function descriptor: " +
typed_descriptor->ModuleName());
@@ -130,14 +128,6 @@ class MockWorker {
return Status::OK();
}
Status WhileTrueLoop(const std::vector<std::shared_ptr<RayObject>> &args,
const std::vector<ObjectID> &return_ids,
std::vector<std::shared_ptr<RayObject>> *results) {
while (1) {
}
return Status::OK();
}
int64_t prev_seq_no_ = 0;
};
@@ -66,9 +66,9 @@ TEST(SchedulingQueueTest, TestWaitForObjects) {
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue.Add(0, -1, fn_ok, fn_rej);
queue.Add(1, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj1}));
queue.Add(2, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj2}));
queue.Add(3, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj3}));
queue.Add(1, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj1}));
queue.Add(2, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj2}));
queue.Add(3, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj3}));
ASSERT_EQ(n_ok, 1);
waiter.Complete(0);
@@ -92,7 +92,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) {
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue.Add(0, -1, fn_ok, fn_rej);
queue.Add(1, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj1}));
queue.Add(1, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj1}));
ASSERT_EQ(n_ok, 1);
io_service.run();
ASSERT_EQ(n_rej, 0);
@@ -158,25 +158,6 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
ASSERT_EQ(n_rej, 2);
}
TEST(SchedulingQueueTest, TestCancelQueuedTask) {
NormalSchedulingQueue *queue = new NormalSchedulingQueue();
ASSERT_TRUE(queue->TaskQueueEmpty());
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue->Add(-1, -1, fn_ok, fn_rej);
queue->Add(-1, -1, fn_ok, fn_rej);
queue->Add(-1, -1, fn_ok, fn_rej);
queue->Add(-1, -1, fn_ok, fn_rej);
queue->Add(-1, -1, fn_ok, fn_rej);
ASSERT_TRUE(queue->CancelTaskIfFound(TaskID::Nil()));
ASSERT_FALSE(queue->TaskQueueEmpty());
queue->ScheduleRequests();
ASSERT_EQ(n_ok, 4);
ASSERT_EQ(n_rej, 0);
}
} // namespace ray
int main(int argc, char **argv) {
@@ -482,12 +482,12 @@ void CoreWorkerDirectTaskReceiver::HandleTask(
// TODO(swang): Remove this with legacy raylet code.
dependencies.pop_back();
it->second->Add(request.sequence_number(), request.client_processed_up_to(),
accept_callback, reject_callback, task_spec.TaskId(), dependencies);
accept_callback, reject_callback, dependencies);
} else {
// Add the normal task's callbacks to the non-actor scheduling queue.
normal_scheduling_queue_->Add(request.sequence_number(),
request.client_processed_up_to(), accept_callback,
reject_callback, task_spec.TaskId(), dependencies);
reject_callback, dependencies);
}
}
@@ -501,10 +501,4 @@ void CoreWorkerDirectTaskReceiver::RunNormalTasksFromQueue() {
normal_scheduling_queue_->ScheduleRequests();
}
bool CoreWorkerDirectTaskReceiver::CancelQueuedNormalTask(TaskID task_id) {
// Look up the task to be canceled in the queue of normal tasks. If it is found and
// removed successfully, return true.
return normal_scheduling_queue_->CancelTaskIfFound(task_id);
}
} // namespace ray
@@ -254,23 +254,19 @@ class InboundRequest {
public:
InboundRequest(){};
InboundRequest(std::function<void()> accept_callback,
std::function<void()> reject_callback, TaskID task_id,
bool has_dependencies)
std::function<void()> reject_callback, bool has_dependencies)
: accept_callback_(accept_callback),
reject_callback_(reject_callback),
task_id(task_id),
has_pending_dependencies_(has_dependencies) {}
void Accept() { accept_callback_(); }
void Cancel() { reject_callback_(); }
bool CanExecute() const { return !has_pending_dependencies_; }
ray::TaskID TaskID() const { return task_id; }
void MarkDependenciesSatisfied() { has_pending_dependencies_ = false; }
private:
std::function<void()> accept_callback_;
std::function<void()> reject_callback_;
ray::TaskID task_id;
bool has_pending_dependencies_;
};
@@ -350,11 +346,10 @@ class SchedulingQueue {
public:
virtual void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request,
std::function<void()> reject_request, TaskID task_id = TaskID::Nil(),
std::function<void()> reject_request,
const std::vector<rpc::ObjectReference> &dependencies = {}) = 0;
virtual void ScheduleRequests() = 0;
virtual bool TaskQueueEmpty() const = 0;
virtual bool CancelTaskIfFound(TaskID task_id) = 0;
virtual ~SchedulingQueue(){};
};
@@ -376,7 +371,6 @@ class ActorSchedulingQueue : public SchedulingQueue {
/// Add a new actor task's callbacks to the worker queue.
void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request, std::function<void()> reject_request,
TaskID task_id = TaskID::Nil(),
const std::vector<rpc::ObjectReference> &dependencies = {}) {
// A seq_no of -1 means no ordering constraint. Actor tasks must be executed in order.
RAY_CHECK(seq_no != -1);
@@ -389,7 +383,7 @@ class ActorSchedulingQueue : public SchedulingQueue {
}
RAY_LOG(DEBUG) << "Enqueue " << seq_no << " cur seqno " << next_seq_no_;
pending_actor_tasks_[seq_no] =
InboundRequest(accept_request, reject_request, task_id, dependencies.size() > 0);
InboundRequest(accept_request, reject_request, dependencies.size() > 0);
if (dependencies.size() > 0) {
waiter_.Wait(dependencies, [seq_no, this]() {
RAY_CHECK(boost::this_thread::get_id() == main_thread_id_);
@@ -403,15 +397,6 @@ class ActorSchedulingQueue : public SchedulingQueue {
ScheduleRequests();
}
// We don't allow the cancellation of actor tasks, so invoking CancelTaskIfFound results
// in a fatal error.
bool CancelTaskIfFound(TaskID task_id) {
RAY_CHECK(false) << "Cannot cancel actor tasks";
// The return instruction will never be executed, but we need to include it
// nonetheless because this is a non-void function.
return false;
}
/// Schedules as many requests as possible in sequence.
void ScheduleRequests() {
// Only call SetMaxActorConcurrency to configure threadpool size when the
@@ -535,45 +520,22 @@ class NormalSchedulingQueue : public SchedulingQueue {
/// Add a new task's callbacks to the worker queue.
void Add(int64_t seq_no, int64_t client_processed_up_to,
std::function<void()> accept_request, std::function<void()> reject_request,
TaskID task_id = TaskID::Nil(),
const std::vector<rpc::ObjectReference> &dependencies = {}) {
absl::MutexLock lock(&mu_);
// Normal tasks should not have ordering constraints.
RAY_CHECK(seq_no == -1);
// Create a InboundRequest object for the new task, and add it to the queue.
pending_normal_tasks_.push_back(
InboundRequest(accept_request, reject_request, task_id, dependencies.size() > 0));
}
// Search for an InboundRequest associated with the task that we are trying to cancel.
// If found, remove the InboundRequest from the queue and return true. Otherwise, return
// false.
bool CancelTaskIfFound(TaskID task_id) {
absl::MutexLock lock(&mu_);
for (std::deque<InboundRequest>::reverse_iterator it = pending_normal_tasks_.rbegin();
it != pending_normal_tasks_.rend(); ++it) {
if (it->TaskID() == task_id) {
pending_normal_tasks_.erase(std::next(it).base());
return true;
}
}
return false;
InboundRequest(accept_request, reject_request, dependencies.size() > 0));
}
/// Schedules as many requests as possible in sequence.
void ScheduleRequests() {
while (true) {
InboundRequest head;
{
absl::MutexLock lock(&mu_);
if (!pending_normal_tasks_.empty()) {
head = pending_normal_tasks_.front();
pending_normal_tasks_.pop_front();
} else {
return;
}
}
absl::MutexLock lock(&mu_);
while (!pending_normal_tasks_.empty()) {
auto &head = pending_normal_tasks_.front();
head.Accept();
pending_normal_tasks_.pop_front();
}
}
@@ -621,8 +583,6 @@ class CoreWorkerDirectTaskReceiver {
/// Pop tasks from the queue and execute them sequentially
void RunNormalTasksFromQueue();
bool CancelQueuedNormalTask(TaskID task_id);
private:
// Worker context.
WorkerContext &worker_context_;