From 8ef835ff03eab0e1beab1d08eb2333295846bfe1 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Sat, 23 Jan 2021 13:57:30 +0800 Subject: [PATCH] Remove idle actor from worker pool. (#13523) --- src/ray/raylet/worker_pool.cc | 32 +++++++++---------------- src/ray/raylet/worker_pool.h | 2 -- src/ray/raylet/worker_pool_test.cc | 38 ++++-------------------------- 3 files changed, 16 insertions(+), 56 deletions(-) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 93a568748..4ed257f46 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -159,9 +159,8 @@ Process WorkerPool::StartWorkerProcess( return Process(); } // Either there are no workers pending registration or the worker start is being forced. - RAY_LOG(DEBUG) << "Starting new worker process, current pool has " - << state.idle_actor.size() << " actor workers, and " << state.idle.size() - << " non-actor workers"; + RAY_LOG(DEBUG) << "Starting new worker process, current pool has " << state.idle.size() + << " workers"; int workers_to_start = 1; if (dynamic_options.empty()) { @@ -625,15 +624,11 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { state.idle_dedicated_workers[task_id] = worker; } else { // The worker is not used for the actor creation task with dynamic options. - // Put the worker to the corresponding idle pool. - if (worker->GetActorId().IsNil()) { - state.idle.insert(worker); - int64_t now = current_time_ms(); - idle_of_all_languages_.emplace_back(worker, now); - idle_of_all_languages_map_[worker] = now; - } else { - state.idle_actor[worker->GetActorId()] = worker; - } + // Put the worker to the idle pool. + state.idle.insert(worker); + int64_t now = current_time_ms(); + idle_of_all_languages_.emplace_back(worker, now); + idle_of_all_languages_map_[worker] = now; } } @@ -787,7 +782,10 @@ std::shared_ptr WorkerPool::PopWorker( state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc; } } - } else if (!task_spec.IsActorTask()) { + } else if (task_spec.IsActorTask()) { + // Code path of actor task. + RAY_CHECK(false) << "Direct call shouldn't reach here."; + } else { // Code path of normal task or actor creation task without dynamic worker options. // Find an available worker which is already assigned to this job. // Try to pop the most recently pushed worker. @@ -812,14 +810,6 @@ std::shared_ptr WorkerPool::PopWorker( proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId()); } - } else { - // Code path of actor task. - const auto &actor_id = task_spec.ActorId(); - auto actor_entry = state.idle_actor.find(actor_id); - if (actor_entry != state.idle_actor.end()) { - worker = std::move(actor_entry->second); - state.idle_actor.erase(actor_entry); - } } if (worker == nullptr && proc.IsValid()) { diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 66d4b94c7..703fbf77b 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -358,8 +358,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_map> idle_dedicated_workers; /// The pool of idle non-actor workers. std::unordered_set> idle; - /// The pool of idle actor workers. - std::unordered_map> idle_actor; // States for io workers used for spilling objects. IOWorkerState spill_io_worker_state; // States for io workers used for restoring objects. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index ee8f3356b..0d2c0e314 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -343,28 +343,6 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { ASSERT_EQ(popped_worker, nullptr); } -TEST_F(WorkerPoolTest, PopActorWorker) { - // Create a worker. - auto worker = CreateWorker(Process::CreateNewDummy()); - // Add the worker to the pool. - worker_pool_->PushWorker(worker); - - // Assign an actor ID to the worker. - const auto task_spec = ExampleTaskSpec(); - auto actor = worker_pool_->PopWorker(task_spec); - auto actor_id = ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1); - actor->AssignActorId(actor_id); - worker_pool_->PushWorker(actor); - - // Check that there are no more non-actor workers. - ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr); - // Check that we can pop the actor worker. - const auto actor_task_spec = ExampleTaskSpec(actor_id); - actor = worker_pool_->PopWorker(actor_task_spec); - ASSERT_EQ(actor, worker); - ASSERT_EQ(actor->GetActorId(), actor_id); -} - TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { // Create a Python Worker, and add it to the pool auto py_worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON); @@ -428,25 +406,19 @@ TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) { worker_pool_->PushWorker(worker); } } - std::unordered_set worker_ids; for (int round = 0; round < 2; round++) { std::vector> workers; - // Pop workers for actor (creation) tasks. + // Pop workers for actor. for (auto job_id : job_ids) { - auto actor_id = ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1); - // For the first round, we pop for actor creation tasks. - // For the second round, we pop for actor tasks. - auto task_spec = - ExampleTaskSpec(round == 0 ? ActorID::Nil() : actor_id, Language::PYTHON, - job_id, round == 0 ? actor_id : ActorID::Nil()); + auto actor_creation_id = ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1); + // Pop workers for actor creation tasks. + auto task_spec = ExampleTaskSpec(/*actor_id=*/ActorID::Nil(), Language::PYTHON, + job_id, actor_creation_id); auto worker = worker_pool_->PopWorker(task_spec); ASSERT_TRUE(worker); ASSERT_EQ(worker->GetAssignedJobId(), job_id); - if (round == 0) { - worker->AssignActorId(actor_id); - } workers.push_back(worker); }