From 4b4941435d42e5b7329388de137a590f376d18bb Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Sun, 7 Feb 2021 21:12:54 +0800 Subject: [PATCH] [Java] fix actor restart failure when multi-worker is turned on (#13793) --- .../java/io/ray/test/ActorRestartTest.java | 20 ++++++++-- src/ray/raylet/node_manager.cc | 2 +- src/ray/raylet/worker_pool.cc | 28 +++++++++++-- src/ray/raylet/worker_pool.h | 9 ++++- src/ray/raylet/worker_pool_test.cc | 39 ++++++++++++++++++- 5 files changed, 88 insertions(+), 10 deletions(-) diff --git a/java/test/src/main/java/io/ray/test/ActorRestartTest.java b/java/test/src/main/java/io/ray/test/ActorRestartTest.java index 26326073c..c57f9b614 100644 --- a/java/test/src/main/java/io/ray/test/ActorRestartTest.java +++ b/java/test/src/main/java/io/ray/test/ActorRestartTest.java @@ -3,15 +3,14 @@ package io.ray.test; import io.ray.api.ActorHandle; import io.ray.api.Ray; import io.ray.runtime.exception.RayActorException; +import io.ray.runtime.exception.RayException; import io.ray.runtime.util.SystemUtil; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.testng.Assert; import org.testng.annotations.Test; -@Test( - groups = {"cluster"}, - enabled = false) +@Test(groups = {"cluster"}) public class ActorRestartTest extends BaseTest { public static class Counter { @@ -58,6 +57,7 @@ public class ActorRestartTest extends BaseTest { // Kill the actor process. killActorProcess(actor); + waitForActorAlive(actor); int value = actor.task(Counter::increase).remote().get(); Assert.assertEquals(value, 1); @@ -83,4 +83,18 @@ public class ActorRestartTest extends BaseTest { // Wait for the actor to be killed. TimeUnit.SECONDS.sleep(1); } + + private static void waitForActorAlive(ActorHandle actor) { + Assert.assertTrue( + TestUtils.waitForCondition( + () -> { + try { + actor.task(Counter::getPid).remote().get(); + return true; + } catch (RayException e) { + return false; + } + }, + 10000)); + } } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d0e3be78b..9b66d0a7c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1267,7 +1267,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie } // Remove the dead client from the pool and stop listening for messages. - worker_pool_.DisconnectWorker(worker); + worker_pool_.DisconnectWorker(worker, disconnect_type); // Return the resources that were being used by this worker. cluster_task_manager_->ReleaseWorkerResources(worker); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index ff6083199..89749f2d4 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -792,7 +792,8 @@ std::shared_ptr WorkerPool::PopWorker( for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); it++) { if (task_spec.GetLanguage() != it->first->GetLanguage() || - it->first->GetAssignedJobId() != task_spec.JobId()) { + it->first->GetAssignedJobId() != task_spec.JobId() || + state.pending_disconnection_workers.count(it->first) > 0) { continue; } state.idle.erase(it->first); @@ -857,9 +858,12 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, } } -bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { +bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker, + rpc::WorkerExitType disconnect_type) { auto &state = GetStateForLanguage(worker->GetLanguage()); RAY_CHECK(RemoveWorker(state.registered_workers, worker)); + RAY_UNUSED(RemoveWorker(state.pending_disconnection_workers, worker)); + for (auto it = idle_of_all_languages_.begin(); it != idle_of_all_languages_.end(); it++) { if (it->first == worker) { @@ -870,7 +874,25 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker } MarkPortAsFree(worker->AssignedPort()); - return RemoveWorker(state.idle, worker); + auto status = RemoveWorker(state.idle, worker); + if (disconnect_type != rpc::WorkerExitType::INTENDED_EXIT) { + // A Java worker process may have multiple workers. If one of them disconnects + // unintentionally (which means that the worker process has died), we remove the + // others from idle pool so that the failed actor will not be rescheduled on the same + // process. + auto pid = worker->GetProcess().GetId(); + for (auto worker2 : state.registered_workers) { + if (worker2->GetProcess().GetId() == pid) { + // NOTE(kfstorm): We have to use a new field to record these workers (instead of + // just removing them from idle sets) because they may haven't announced worker + // port yet. When they announce worker port, they'll be marked idle again. So + // removing them from idle sets here doesn't really prevent them from being popped + // later. + state.pending_disconnection_workers.insert(worker2); + } + } + } + return status; } void WorkerPool::DisconnectDriver(const std::shared_ptr &driver) { diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 703fbf77b..ae7d1c52c 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -184,9 +184,11 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Disconnect a registered worker. /// - /// \param The worker to disconnect. The worker must be registered. + /// \param worker The worker to disconnect. The worker must be registered. + /// \param disconnect_type Type of a worker exit. /// \return Whether the given worker was in the pool of idle workers. - bool DisconnectWorker(const std::shared_ptr &worker); + bool DisconnectWorker(const std::shared_ptr &worker, + rpc::WorkerExitType disconnect_type); /// Disconnect a registered driver. /// @@ -367,6 +369,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::unordered_set> registered_workers; /// All drivers that have registered and are still connected. std::unordered_set> registered_drivers; + /// All workers that have registered but is about to disconnect. They shouldn't be + /// popped anymore. + std::unordered_set> pending_disconnection_workers; /// A map from the pids of starting worker processes /// to the number of their unregistered workers. std::unordered_map starting_worker_processes; diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 0d2c0e314..044dc33a2 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -268,7 +268,8 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { // Check that there's no starting worker process ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0); for (const auto &worker : workers) { - worker_pool_->DisconnectWorker(worker); + worker_pool_->DisconnectWorker( + worker, /*disconnect_type=*/rpc::WorkerExitType::INTENDED_EXIT); // Check that we cannot lookup the worker after it's disconnected. ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr); } @@ -710,6 +711,42 @@ TEST_F(WorkerPoolTest, DeleteWorkerPushPop) { }); } +TEST_F(WorkerPoolTest, NoPopOnCrashedWorkerProcess) { + // Start a Java worker process. + Process proc = + worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID); + auto worker1 = CreateWorker(Process(), Language::JAVA); + auto worker2 = CreateWorker(Process(), Language::JAVA); + + // We now imitate worker process crashing while core worker initializing. + + // 1. we register both workers. + RAY_CHECK_OK(worker_pool_->RegisterWorker(worker1, proc.GetId(), [](Status, int) {})); + RAY_CHECK_OK(worker_pool_->RegisterWorker(worker2, proc.GetId(), [](Status, int) {})); + + // 2. announce worker port for worker 1. When interacting with worker pool, it's + // PushWorker. + worker_pool_->PushWorker(worker1); + + // 3. kill the worker process. Now let's assume that Raylet found that the connection + // with worker 1 disconnected first. + worker_pool_->DisconnectWorker( + worker1, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT); + + // 4. but the RPC for announcing worker port for worker 2 is already in Raylet input + // buffer. So now Raylet needs to handle worker 2. + worker_pool_->PushWorker(worker2); + + // 5. Let's try to pop a worker to execute a task. Worker 2 shouldn't be popped because + // the process has crashed. + const auto task_spec = ExampleTaskSpec(); + ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr); + + // 6. Now Raylet disconnects with worker 2. + worker_pool_->DisconnectWorker( + worker2, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT); +} + } // namespace raylet } // namespace ray