[Java] fix actor restart failure when multi-worker is turned on (#13793)

This commit is contained in:
Kai Yang
2021-02-07 21:12:54 +08:00
committed by GitHub
parent 1412f3c546
commit 4b4941435d
5 changed files with 88 additions and 10 deletions
@@ -3,15 +3,14 @@ package io.ray.test;
import io.ray.api.ActorHandle;
import io.ray.api.Ray;
import io.ray.runtime.exception.RayActorException;
import io.ray.runtime.exception.RayException;
import io.ray.runtime.util.SystemUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;
@Test(
groups = {"cluster"},
enabled = false)
@Test(groups = {"cluster"})
public class ActorRestartTest extends BaseTest {
public static class Counter {
@@ -58,6 +57,7 @@ public class ActorRestartTest extends BaseTest {
// Kill the actor process.
killActorProcess(actor);
waitForActorAlive(actor);
int value = actor.task(Counter::increase).remote().get();
Assert.assertEquals(value, 1);
@@ -83,4 +83,18 @@ public class ActorRestartTest extends BaseTest {
// Wait for the actor to be killed.
TimeUnit.SECONDS.sleep(1);
}
private static void waitForActorAlive(ActorHandle<Counter> actor) {
Assert.assertTrue(
TestUtils.waitForCondition(
() -> {
try {
actor.task(Counter::getPid).remote().get();
return true;
} catch (RayException e) {
return false;
}
},
10000));
}
}
+1 -1
View File
@@ -1267,7 +1267,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
}
// Remove the dead client from the pool and stop listening for messages.
worker_pool_.DisconnectWorker(worker);
worker_pool_.DisconnectWorker(worker, disconnect_type);
// Return the resources that were being used by this worker.
cluster_task_manager_->ReleaseWorkerResources(worker);
+25 -3
View File
@@ -792,7 +792,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend();
it++) {
if (task_spec.GetLanguage() != it->first->GetLanguage() ||
it->first->GetAssignedJobId() != task_spec.JobId()) {
it->first->GetAssignedJobId() != task_spec.JobId() ||
state.pending_disconnection_workers.count(it->first) > 0) {
continue;
}
state.idle.erase(it->first);
@@ -857,9 +858,12 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
}
}
bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker) {
bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker,
rpc::WorkerExitType disconnect_type) {
auto &state = GetStateForLanguage(worker->GetLanguage());
RAY_CHECK(RemoveWorker(state.registered_workers, worker));
RAY_UNUSED(RemoveWorker(state.pending_disconnection_workers, worker));
for (auto it = idle_of_all_languages_.begin(); it != idle_of_all_languages_.end();
it++) {
if (it->first == worker) {
@@ -870,7 +874,25 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker
}
MarkPortAsFree(worker->AssignedPort());
return RemoveWorker(state.idle, worker);
auto status = RemoveWorker(state.idle, worker);
if (disconnect_type != rpc::WorkerExitType::INTENDED_EXIT) {
// A Java worker process may have multiple workers. If one of them disconnects
// unintentionally (which means that the worker process has died), we remove the
// others from idle pool so that the failed actor will not be rescheduled on the same
// process.
auto pid = worker->GetProcess().GetId();
for (auto worker2 : state.registered_workers) {
if (worker2->GetProcess().GetId() == pid) {
// NOTE(kfstorm): We have to use a new field to record these workers (instead of
// just removing them from idle sets) because they may haven't announced worker
// port yet. When they announce worker port, they'll be marked idle again. So
// removing them from idle sets here doesn't really prevent them from being popped
// later.
state.pending_disconnection_workers.insert(worker2);
}
}
}
return status;
}
void WorkerPool::DisconnectDriver(const std::shared_ptr<WorkerInterface> &driver) {
+7 -2
View File
@@ -184,9 +184,11 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// Disconnect a registered worker.
///
/// \param The worker to disconnect. The worker must be registered.
/// \param worker The worker to disconnect. The worker must be registered.
/// \param disconnect_type Type of a worker exit.
/// \return Whether the given worker was in the pool of idle workers.
bool DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker);
bool DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker,
rpc::WorkerExitType disconnect_type);
/// Disconnect a registered driver.
///
@@ -367,6 +369,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::unordered_set<std::shared_ptr<WorkerInterface>> registered_workers;
/// All drivers that have registered and are still connected.
std::unordered_set<std::shared_ptr<WorkerInterface>> registered_drivers;
/// All workers that have registered but is about to disconnect. They shouldn't be
/// popped anymore.
std::unordered_set<std::shared_ptr<WorkerInterface>> pending_disconnection_workers;
/// A map from the pids of starting worker processes
/// to the number of their unregistered workers.
std::unordered_map<Process, int> starting_worker_processes;
+38 -1
View File
@@ -268,7 +268,8 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) {
// Check that there's no starting worker process
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 0);
for (const auto &worker : workers) {
worker_pool_->DisconnectWorker(worker);
worker_pool_->DisconnectWorker(
worker, /*disconnect_type=*/rpc::WorkerExitType::INTENDED_EXIT);
// Check that we cannot lookup the worker after it's disconnected.
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr);
}
@@ -710,6 +711,42 @@ TEST_F(WorkerPoolTest, DeleteWorkerPushPop) {
});
}
TEST_F(WorkerPoolTest, NoPopOnCrashedWorkerProcess) {
// Start a Java worker process.
Process proc =
worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID);
auto worker1 = CreateWorker(Process(), Language::JAVA);
auto worker2 = CreateWorker(Process(), Language::JAVA);
// We now imitate worker process crashing while core worker initializing.
// 1. we register both workers.
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker1, proc.GetId(), [](Status, int) {}));
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker2, proc.GetId(), [](Status, int) {}));
// 2. announce worker port for worker 1. When interacting with worker pool, it's
// PushWorker.
worker_pool_->PushWorker(worker1);
// 3. kill the worker process. Now let's assume that Raylet found that the connection
// with worker 1 disconnected first.
worker_pool_->DisconnectWorker(
worker1, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
// 4. but the RPC for announcing worker port for worker 2 is already in Raylet input
// buffer. So now Raylet needs to handle worker 2.
worker_pool_->PushWorker(worker2);
// 5. Let's try to pop a worker to execute a task. Worker 2 shouldn't be popped because
// the process has crashed.
const auto task_spec = ExampleTaskSpec();
ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr);
// 6. Now Raylet disconnects with worker 2.
worker_pool_->DisconnectWorker(
worker2, /*disconnect_type=*/rpc::WorkerExitType::SYSTEM_ERROR_EXIT);
}
} // namespace raylet
} // namespace ray