diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index db9ec8e89..da44b1453 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -760,7 +760,6 @@ void CoreWorker::InternalHeartbeat(const boost::system::error_code &error) { } absl::MutexLock lock(&mutex_); - while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) { auto &spec = to_resubmit_.front().second; if (spec.IsActorTask()) { @@ -2217,17 +2216,12 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request, rpc::SendReplyCallback send_reply_callback) { absl::MutexLock lock(&mutex_); TaskID task_id = TaskID::FromBinary(request.intended_task_id()); - bool requested_task_running = main_thread_task_id_ == task_id; - bool success = requested_task_running; + bool success = main_thread_task_id_ == task_id; // Try non-force kill - if (requested_task_running && !request.force_kill()) { + if (success && !request.force_kill()) { RAY_LOG(INFO) << "Interrupting a running task " << main_thread_task_id_; success = options_.kill_main(); - } else if (!requested_task_running) { - // If the task is not currently running, check if it is in the worker's queue of - // normal tasks, and remove it if found. - success = direct_task_receiver_->CancelQueuedNormalTask(task_id); } if (request.recursive()) { auto recursive_cancel = CancelChildren(task_id, request.force_kill()); @@ -2240,7 +2234,7 @@ void CoreWorker::HandleCancelTask(const rpc::CancelTaskRequest &request, send_reply_callback(Status::OK(), nullptr, nullptr); // Do force kill after reply callback sent - if (requested_task_running && request.force_kill()) { + if (success && request.force_kill()) { RAY_LOG(INFO) << "Force killing a worker running " << main_thread_task_id_; Disconnect(); if (options_.enable_logging) { diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 3069070ec..0c4d69149 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -841,48 +841,6 @@ TEST_F(SingleNodeTest, TestNormalTaskLocal) { TestNormalTask(resources); } -TEST_F(SingleNodeTest, TestCancelTasks) { - auto &driver = CoreWorkerProcess::GetCoreWorker(); - - // Create two functions, each implementing a while(true) loop. - RayFunction func1(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( - "WhileTrueLoop", "", "", "")); - RayFunction func2(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( - "WhileTrueLoop", "", "", "")); - // Return IDs for the two functions that implement while(true) loops. - std::vector return_ids1; - std::vector return_ids2; - - // Create default args and options needed to submit the tasks that encapsulate func1 and - // func2. - std::vector> args; - TaskOptions options; - - // Submit func1. The function should start looping forever. - driver.SubmitTask(func1, args, options, &return_ids1, /*max_retries=*/0, - std::make_pair(PlacementGroupID::Nil(), -1), true, - /*debugger_breakpoint=*/""); - ASSERT_EQ(return_ids1.size(), 1); - - // Submit func2. The function should be queued at the worker indefinitely. - driver.SubmitTask(func2, args, options, &return_ids2, /*max_retries=*/0, - std::make_pair(PlacementGroupID::Nil(), -1), true, - /*debugger_breakpoint=*/""); - ASSERT_EQ(return_ids2.size(), 1); - - // Cancel func2 by removing it from the worker's queue - RAY_CHECK_OK(driver.CancelTask(return_ids2[0], true, false)); - - // Cancel func1, which is currently running. - RAY_CHECK_OK(driver.CancelTask(return_ids1[0], true, false)); - - // TestNormalTask will get stuck unless both func1 and func2 have been cancelled. Thus, - // if TestNormalTask succeeds, we know that func2 must have been removed from the - // worker's queue. - std::unordered_map resources; - TestNormalTask(resources); -} - TEST_F(TwoNodeTest, TestNormalTaskCrossNodes) { std::unordered_map resources; resources.emplace("resource1", 1); diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index ed30002c6..4439519bb 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -79,8 +79,6 @@ class MockWorker { } else if ("MergeInputArgsAsOutput" == typed_descriptor->ModuleName()) { // Merge input args and write the merged content to each of return ids return MergeInputArgsAsOutput(args, return_ids, results); - } else if ("WhileTrueLoop" == typed_descriptor->ModuleName()) { - return WhileTrueLoop(args, return_ids, results); } else { return Status::TypeError("Unknown function descriptor: " + typed_descriptor->ModuleName()); @@ -130,14 +128,6 @@ class MockWorker { return Status::OK(); } - Status WhileTrueLoop(const std::vector> &args, - const std::vector &return_ids, - std::vector> *results) { - while (1) { - } - return Status::OK(); - } - int64_t prev_seq_no_ = 0; }; diff --git a/src/ray/core_worker/test/scheduling_queue_test.cc b/src/ray/core_worker/test/scheduling_queue_test.cc index 6854c1810..8c8e60fd5 100644 --- a/src/ray/core_worker/test/scheduling_queue_test.cc +++ b/src/ray/core_worker/test/scheduling_queue_test.cc @@ -66,9 +66,9 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { auto fn_ok = [&n_ok]() { n_ok++; }; auto fn_rej = [&n_rej]() { n_rej++; }; queue.Add(0, -1, fn_ok, fn_rej); - queue.Add(1, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj1})); - queue.Add(2, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj2})); - queue.Add(3, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj3})); + queue.Add(1, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj1})); + queue.Add(2, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj2})); + queue.Add(3, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj3})); ASSERT_EQ(n_ok, 1); waiter.Complete(0); @@ -92,7 +92,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { auto fn_ok = [&n_ok]() { n_ok++; }; auto fn_rej = [&n_rej]() { n_rej++; }; queue.Add(0, -1, fn_ok, fn_rej); - queue.Add(1, -1, fn_ok, fn_rej, TaskID::Nil(), ObjectIdsToRefs({obj1})); + queue.Add(1, -1, fn_ok, fn_rej, ObjectIdsToRefs({obj1})); ASSERT_EQ(n_ok, 1); io_service.run(); ASSERT_EQ(n_rej, 0); @@ -158,25 +158,6 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) { ASSERT_EQ(n_rej, 2); } -TEST(SchedulingQueueTest, TestCancelQueuedTask) { - NormalSchedulingQueue *queue = new NormalSchedulingQueue(); - ASSERT_TRUE(queue->TaskQueueEmpty()); - int n_ok = 0; - int n_rej = 0; - auto fn_ok = [&n_ok]() { n_ok++; }; - auto fn_rej = [&n_rej]() { n_rej++; }; - queue->Add(-1, -1, fn_ok, fn_rej); - queue->Add(-1, -1, fn_ok, fn_rej); - queue->Add(-1, -1, fn_ok, fn_rej); - queue->Add(-1, -1, fn_ok, fn_rej); - queue->Add(-1, -1, fn_ok, fn_rej); - ASSERT_TRUE(queue->CancelTaskIfFound(TaskID::Nil())); - ASSERT_FALSE(queue->TaskQueueEmpty()); - queue->ScheduleRequests(); - ASSERT_EQ(n_ok, 4); - ASSERT_EQ(n_rej, 0); -} - } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index bac80af4f..e266b0d94 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -482,12 +482,12 @@ void CoreWorkerDirectTaskReceiver::HandleTask( // TODO(swang): Remove this with legacy raylet code. dependencies.pop_back(); it->second->Add(request.sequence_number(), request.client_processed_up_to(), - accept_callback, reject_callback, task_spec.TaskId(), dependencies); + accept_callback, reject_callback, dependencies); } else { // Add the normal task's callbacks to the non-actor scheduling queue. normal_scheduling_queue_->Add(request.sequence_number(), request.client_processed_up_to(), accept_callback, - reject_callback, task_spec.TaskId(), dependencies); + reject_callback, dependencies); } } @@ -501,10 +501,4 @@ void CoreWorkerDirectTaskReceiver::RunNormalTasksFromQueue() { normal_scheduling_queue_->ScheduleRequests(); } -bool CoreWorkerDirectTaskReceiver::CancelQueuedNormalTask(TaskID task_id) { - // Look up the task to be canceled in the queue of normal tasks. If it is found and - // removed successfully, return true. - return normal_scheduling_queue_->CancelTaskIfFound(task_id); -} - } // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index cbd0a82fc..ab28dc85a 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -254,23 +254,19 @@ class InboundRequest { public: InboundRequest(){}; InboundRequest(std::function accept_callback, - std::function reject_callback, TaskID task_id, - bool has_dependencies) + std::function reject_callback, bool has_dependencies) : accept_callback_(accept_callback), reject_callback_(reject_callback), - task_id(task_id), has_pending_dependencies_(has_dependencies) {} void Accept() { accept_callback_(); } void Cancel() { reject_callback_(); } bool CanExecute() const { return !has_pending_dependencies_; } - ray::TaskID TaskID() const { return task_id; } void MarkDependenciesSatisfied() { has_pending_dependencies_ = false; } private: std::function accept_callback_; std::function reject_callback_; - ray::TaskID task_id; bool has_pending_dependencies_; }; @@ -350,11 +346,10 @@ class SchedulingQueue { public: virtual void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, - std::function reject_request, TaskID task_id = TaskID::Nil(), + std::function reject_request, const std::vector &dependencies = {}) = 0; virtual void ScheduleRequests() = 0; virtual bool TaskQueueEmpty() const = 0; - virtual bool CancelTaskIfFound(TaskID task_id) = 0; virtual ~SchedulingQueue(){}; }; @@ -376,7 +371,6 @@ class ActorSchedulingQueue : public SchedulingQueue { /// Add a new actor task's callbacks to the worker queue. void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, std::function reject_request, - TaskID task_id = TaskID::Nil(), const std::vector &dependencies = {}) { // A seq_no of -1 means no ordering constraint. Actor tasks must be executed in order. RAY_CHECK(seq_no != -1); @@ -389,7 +383,7 @@ class ActorSchedulingQueue : public SchedulingQueue { } RAY_LOG(DEBUG) << "Enqueue " << seq_no << " cur seqno " << next_seq_no_; pending_actor_tasks_[seq_no] = - InboundRequest(accept_request, reject_request, task_id, dependencies.size() > 0); + InboundRequest(accept_request, reject_request, dependencies.size() > 0); if (dependencies.size() > 0) { waiter_.Wait(dependencies, [seq_no, this]() { RAY_CHECK(boost::this_thread::get_id() == main_thread_id_); @@ -403,15 +397,6 @@ class ActorSchedulingQueue : public SchedulingQueue { ScheduleRequests(); } - // We don't allow the cancellation of actor tasks, so invoking CancelTaskIfFound results - // in a fatal error. - bool CancelTaskIfFound(TaskID task_id) { - RAY_CHECK(false) << "Cannot cancel actor tasks"; - // The return instruction will never be executed, but we need to include it - // nonetheless because this is a non-void function. - return false; - } - /// Schedules as many requests as possible in sequence. void ScheduleRequests() { // Only call SetMaxActorConcurrency to configure threadpool size when the @@ -535,45 +520,22 @@ class NormalSchedulingQueue : public SchedulingQueue { /// Add a new task's callbacks to the worker queue. void Add(int64_t seq_no, int64_t client_processed_up_to, std::function accept_request, std::function reject_request, - TaskID task_id = TaskID::Nil(), const std::vector &dependencies = {}) { absl::MutexLock lock(&mu_); // Normal tasks should not have ordering constraints. RAY_CHECK(seq_no == -1); // Create a InboundRequest object for the new task, and add it to the queue. pending_normal_tasks_.push_back( - InboundRequest(accept_request, reject_request, task_id, dependencies.size() > 0)); - } - - // Search for an InboundRequest associated with the task that we are trying to cancel. - // If found, remove the InboundRequest from the queue and return true. Otherwise, return - // false. - bool CancelTaskIfFound(TaskID task_id) { - absl::MutexLock lock(&mu_); - for (std::deque::reverse_iterator it = pending_normal_tasks_.rbegin(); - it != pending_normal_tasks_.rend(); ++it) { - if (it->TaskID() == task_id) { - pending_normal_tasks_.erase(std::next(it).base()); - return true; - } - } - return false; + InboundRequest(accept_request, reject_request, dependencies.size() > 0)); } /// Schedules as many requests as possible in sequence. void ScheduleRequests() { - while (true) { - InboundRequest head; - { - absl::MutexLock lock(&mu_); - if (!pending_normal_tasks_.empty()) { - head = pending_normal_tasks_.front(); - pending_normal_tasks_.pop_front(); - } else { - return; - } - } + absl::MutexLock lock(&mu_); + while (!pending_normal_tasks_.empty()) { + auto &head = pending_normal_tasks_.front(); head.Accept(); + pending_normal_tasks_.pop_front(); } } @@ -621,8 +583,6 @@ class CoreWorkerDirectTaskReceiver { /// Pop tasks from the queue and execute them sequentially void RunNormalTasksFromQueue(); - bool CancelQueuedNormalTask(TaskID task_id); - private: // Worker context. WorkerContext &worker_context_;