From eb7b73d73143ead0a315ef1c54833a89d0e8bc7b Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 21 Nov 2019 15:37:15 -0800 Subject: [PATCH] Disconnect direct task workers that died (#6213) * Disconnect workers that died so that we push the worker died error to redis * Push error if actor is non nil * fix test --- .../test/direct_task_transport_test.cc | 24 +++++++++++++++---- .../transport/direct_task_transport.cc | 2 +- src/ray/protobuf/node_manager.proto | 3 +++ src/ray/raylet/node_manager.cc | 20 +++++++++------- src/ray/raylet/raylet_client.cc | 3 ++- src/ray/raylet/raylet_client.h | 5 ++-- 6 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 0eaed321c..024d8a250 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -23,8 +23,12 @@ class MockWorkerClient : public rpc::CoreWorkerClientInterface { class MockRayletClient : public WorkerLeaseInterface { public: - ray::Status ReturnWorker(int worker_port) override { - num_workers_returned += 1; + ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override { + if (disconnect_worker) { + num_workers_disconnected++; + } else { + num_workers_returned++; + } return Status::OK(); } @@ -63,6 +67,7 @@ class MockRayletClient : public WorkerLeaseInterface { int num_workers_requested = 0; int num_workers_returned = 0; + int num_workers_disconnected = 0; std::list> callbacks = {}; }; @@ -200,6 +205,7 @@ TEST(DirectTaskTransportTest, TestSubmitOneTask) { worker_client->callbacks[0](Status::OK(), rpc::PushTaskReply()); ASSERT_EQ(raylet_client->num_workers_returned, 1); + ASSERT_EQ(raylet_client->num_workers_disconnected, 0); } TEST(DirectTaskTransportTest, TestHandleTaskFailure) { @@ -216,7 +222,8 @@ TEST(DirectTaskTransportTest, TestHandleTaskFailure) { // Simulate a system failure, i.e., worker died unexpectedly. worker_client->callbacks[0](Status::IOError("oops"), rpc::PushTaskReply()); ASSERT_EQ(worker_client->callbacks.size(), 1); - ASSERT_EQ(raylet_client->num_workers_returned, 1); + ASSERT_EQ(raylet_client->num_workers_returned, 0); + ASSERT_EQ(raylet_client->num_workers_disconnected, 1); } TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) { @@ -257,6 +264,7 @@ TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) { cb(Status::OK(), rpc::PushTaskReply()); } ASSERT_EQ(raylet_client->num_workers_returned, 3); + ASSERT_EQ(raylet_client->num_workers_disconnected, 0); } TEST(DirectTaskTransportTest, TestReuseWorkerLease) { @@ -299,6 +307,7 @@ TEST(DirectTaskTransportTest, TestReuseWorkerLease) { // The second lease request is returned immediately. ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil())); ASSERT_EQ(raylet_client->num_workers_returned, 2); + ASSERT_EQ(raylet_client->num_workers_disconnected, 0); } TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) { @@ -324,13 +333,15 @@ TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) { // Task 1 finishes with failure; the worker is returned. worker_client->callbacks[0](Status::IOError("worker dead"), rpc::PushTaskReply()); ASSERT_EQ(worker_client->callbacks.size(), 1); - ASSERT_EQ(raylet_client->num_workers_returned, 1); + ASSERT_EQ(raylet_client->num_workers_returned, 0); + ASSERT_EQ(raylet_client->num_workers_disconnected, 1); // Task 2 runs successfully on the second worker. ASSERT_TRUE(raylet_client->GrantWorkerLease("localhost", 1001, ClientID::Nil())); ASSERT_EQ(worker_client->callbacks.size(), 2); worker_client->callbacks[1](Status::OK(), rpc::PushTaskReply()); - ASSERT_EQ(raylet_client->num_workers_returned, 2); + ASSERT_EQ(raylet_client->num_workers_returned, 1); + ASSERT_EQ(raylet_client->num_workers_disconnected, 1); } TEST(DirectTaskTransportTest, TestSpillback) { @@ -374,6 +385,9 @@ TEST(DirectTaskTransportTest, TestSpillback) { worker_client->callbacks[0](Status::OK(), rpc::PushTaskReply()); ASSERT_EQ(raylet_client->num_workers_returned, 0); ASSERT_EQ(remote_lease_clients[remote_raylet_id]->num_workers_returned, 1); + + ASSERT_EQ(raylet_client->num_workers_disconnected, 0); + ASSERT_EQ(remote_lease_clients[remote_raylet_id]->num_workers_disconnected, 0); } } // namespace ray diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index d5d1316e3..a3f1539de 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -40,7 +40,7 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(const rpc::WorkerAddress &addr, if (queued_tasks_.empty() || was_error) { auto lease_client = std::move(worker_to_lease_client_[addr]); worker_to_lease_client_.erase(addr); - RAY_CHECK_OK(lease_client->ReturnWorker(addr.second)); + RAY_CHECK_OK(lease_client->ReturnWorker(addr.second, was_error)); } else { auto &client = *client_cache_[addr]; PushNormalTask(addr, client, queued_tasks_.front()); diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 6b33a1cc8..f3c429a6d 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -30,6 +30,9 @@ message WorkerLeaseReply { message ReturnWorkerRequest { // Port of the leased worker that we are now returning. int32 worker_port = 1; + // If true, there was some unrecoverable error and the raylet should + // disconnect the worker. + bool disconnect_worker = 2; } message ReturnWorkerReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6bedea3ea..67f84aca5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1135,9 +1135,9 @@ void NodeManager::ProcessDisconnectClientMessage( } const TaskID &task_id = worker->GetAssignedTaskId(); - // If the worker was running a task, clean up the task and push an error to - // the driver, unless the worker is already dead. - if (!task_id.IsNil() && !worker->IsDead()) { + // If the worker was running a task or actor, clean up the task and push an + // error to the driver, unless the worker is already dead. + if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) { // If the worker was an actor, the task was already cleaned up in // `HandleDisconnectedActor`. if (actor_id.IsNil()) { @@ -1473,12 +1473,16 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, leased_workers_.erase(worker_port); Status status; if (worker) { - // Handle the edge case where the worker was returned before we got the - // unblock RPC by unblocking it immediately (unblock is idempotent). - if (worker->IsBlocked()) { - HandleDirectCallTaskUnblocked(worker); + if (request.disconnect_worker()) { + ProcessDisconnectClientMessage(worker->Connection()); + } else { + // Handle the edge case where the worker was returned before we got the + // unblock RPC by unblocking it immediately (unblock is idempotent). + if (worker->IsBlocked()) { + HandleDirectCallTaskUnblocked(worker); + } + HandleWorkerAvailable(worker); } - HandleWorkerAvailable(worker); } else { status = Status::Invalid("Returned worker does not exist any more"); } diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index f9e839bf4..67eea7436 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -401,9 +401,10 @@ ray::Status RayletClient::RequestWorkerLease( return grpc_client_->RequestWorkerLease(request, callback); } -ray::Status RayletClient::ReturnWorker(int worker_port) { +ray::Status RayletClient::ReturnWorker(int worker_port, bool disconnect_worker) { ray::rpc::ReturnWorkerRequest request; request.set_worker_port(worker_port); + request.set_disconnect_worker(disconnect_worker); return grpc_client_->ReturnWorker( request, [](const ray::Status &status, const ray::rpc::ReturnWorkerReply &reply) { if (!status.ok()) { diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index a28555fc0..caa85b13c 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -75,8 +75,9 @@ class WorkerLeaseInterface { /// Returns a worker to the raylet. /// \param worker_port The local port of the worker on the raylet node. + /// \param disconnect_worker Whether the raylet should disconnect the worker. /// \return ray::Status - virtual ray::Status ReturnWorker(int worker_port) = 0; + virtual ray::Status ReturnWorker(int worker_port, bool disconnect_worker) = 0; virtual ~WorkerLeaseInterface(){}; }; @@ -230,7 +231,7 @@ class RayletClient : public WorkerLeaseInterface { const ray::rpc::ClientCallback &callback) override; /// Implements WorkerLeaseInterface. - ray::Status ReturnWorker(int worker_port) override; + ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override; WorkerID GetWorkerID() const { return worker_id_; }