From 01bb0735698c4fffde78f90e05f96f1ebd43c8f9 Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Oct 2018 00:06:35 -0700 Subject: [PATCH] Suppress errors when worker or driver intentionally disconnects. (#2935) --- .../format/local_scheduler.fbs | 5 ++- src/local_scheduler/local_scheduler_client.cc | 11 ++++-- src/ray/raylet/format/node_manager.fbs | 7 ++-- src/ray/raylet/node_manager.cc | 34 +++++++++++++------ src/ray/raylet/node_manager.h | 3 +- test/failure_test.py | 13 +++++++ 6 files changed, 56 insertions(+), 17 deletions(-) diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index ffdf13d6a..a23bb28f0 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -17,9 +17,12 @@ enum MessageType:int { // Send a reply confirming the successful registration of a worker or driver. // This is sent from the local scheduler to a worker or driver. RegisterClientReply, - // Notify the local scheduler that this client is disconnecting gracefully. + // Notify the local scheduler that this client disconnected unexpectedly. // This is sent from a worker to a local scheduler. DisconnectClient, + // Notify the local scheduler that this client is disconnecting gracefully. + // This is sent from a worker to a local scheduler. + IntentionalDisconnectClient, // Get a new task from the local scheduler. This is sent from a worker to a // local scheduler. GetTask, diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 91b5fa9c9..09bda7f5b 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -56,8 +56,15 @@ void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) { flatbuffers::FlatBufferBuilder fbb; auto message = ray::local_scheduler::protocol::CreateDisconnectClient(fbb); fbb.Finish(message); - write_message(conn->conn, static_cast(MessageType::DisconnectClient), - fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); + if (conn->use_raylet) { + write_message(conn->conn, static_cast( + MessageType::IntentionalDisconnectClient), + fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); + } else { + write_message(conn->conn, + static_cast(MessageType::DisconnectClient), + fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex); + } } void local_scheduler_log_event(LocalSchedulerConnection *conn, diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 4ede8f2b3..72f934a72 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -23,9 +23,12 @@ enum MessageType:int { // Send a reply confirming the successful registration of a worker or driver. // This is sent from the local scheduler to a worker or driver. RegisterClientReply, - // Notify the local scheduler that this client is disconnecting gracefully. + // Notify the local scheduler that this client is disconnecting unexpectedly. // This is sent from a worker to a local scheduler. DisconnectClient, + // Notify the local scheduler that this client is disconnecting gracefully. + // This is sent from a worker to a local scheduler. + IntentionalDisconnectClient, // Get a new task from the local scheduler. This is sent from a worker to a // local scheduler. GetTask, @@ -183,7 +186,7 @@ table PushErrorRequest { } table FreeObjectsRequest { - // Whether keep this request with local object store + // Whether keep this request with local object store // or send it to all the object stores. local_only: bool; // List of object ids we'll delete from object store. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fcc60e030..2d0bcf861 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -26,6 +26,8 @@ RAY_CHECK_ENUM(protocol::MessageType::RegisterClientReply, local_scheduler_protocol::MessageType::RegisterClientReply); RAY_CHECK_ENUM(protocol::MessageType::DisconnectClient, local_scheduler_protocol::MessageType::DisconnectClient); +RAY_CHECK_ENUM(protocol::MessageType::IntentionalDisconnectClient, + local_scheduler_protocol::MessageType::IntentionalDisconnectClient); RAY_CHECK_ENUM(protocol::MessageType::GetTask, local_scheduler_protocol::MessageType::GetTask); RAY_CHECK_ENUM(protocol::MessageType::ExecuteTask, @@ -539,18 +541,19 @@ void NodeManager::ProcessClientMessage( RAY_LOG(DEBUG) << "Message of type " << message_type; auto registered_worker = worker_pool_.GetRegisteredWorker(client); + auto message_type_value = static_cast(message_type); if (registered_worker && registered_worker->IsDead()) { // For a worker that is marked as dead (because the driver has died already), // all the messages are ignored except DisconnectClient. - if (static_cast(message_type) != - protocol::MessageType::DisconnectClient) { + if ((message_type_value != protocol::MessageType::DisconnectClient) && + (message_type_value != protocol::MessageType::IntentionalDisconnectClient)) { // Listen for more messages. client->ProcessMessages(); return; } } - switch (static_cast(message_type)) { + switch (message_type_value) { case protocol::MessageType::RegisterClientRequest: { ProcessRegisterClientRequestMessage(client, message_data); } break; @@ -563,6 +566,12 @@ void NodeManager::ProcessClientMessage( // because it's already disconnected. return; } break; + case protocol::MessageType::IntentionalDisconnectClient: { + ProcessDisconnectClientMessage(client, /* push_warning = */ false); + // We don't need to receive future messages from this client, + // because it's already disconnected. + return; + } break; case protocol::MessageType::SubmitTask: { ProcessSubmitTaskMessage(message_data); } break; @@ -638,7 +647,7 @@ void NodeManager::ProcessGetTaskMessage( } void NodeManager::ProcessDisconnectClientMessage( - const std::shared_ptr &client) { + const std::shared_ptr &client, bool push_warning) { const std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); const std::shared_ptr driver = worker_pool_.GetRegisteredDriver(client); // This client can't be a worker and a driver. @@ -678,13 +687,16 @@ void NodeManager::ProcessDisconnectClientMessage( TreatTaskAsFailed(spec); const JobID &job_id = worker->GetAssignedDriverId(); - // TODO(rkn): Define this constant somewhere else. - std::string type = "worker_died"; - std::ostringstream error_message; - error_message << "A worker died or was killed while executing task " << task_id - << "."; - RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( - job_id, type, error_message.str(), current_time_ms())); + + if (push_warning) { + // TODO(rkn): Define this constant somewhere else. + std::string type = "worker_died"; + std::ostringstream error_message; + error_message << "A worker died or was killed while executing task " << task_id + << "."; + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + job_id, type, error_message.str(), current_time_ms())); + } } worker_pool_.DisconnectWorker(worker); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index b6b23223c..e3d2ca141 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -286,9 +286,10 @@ class NodeManager { /// client. /// /// \param client The client that sent the message. + /// \param push_warning Propogate error message if true. /// \return Void. void ProcessDisconnectClientMessage( - const std::shared_ptr &client); + const std::shared_ptr &client, bool push_warning = true); /// Process client message of SubmitTask /// diff --git a/test/failure_test.py b/test/failure_test.py index 149d4c74d..3f631ae59 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -371,6 +371,19 @@ def test_actor_worker_dying_nothing_in_progress(ray_start_regular): ray.get(task2) +def test_actor_scope_or_intentionally_killed_message(ray_start_regular): + @ray.remote + class Actor(object): + pass + + a = Actor.remote() + a = Actor.remote() + a.__ray_terminate__.remote() + time.sleep(1) + assert len(ray.error_info()) == 0, ( + "Should not have propogated an error - {}".format(ray.error_info())) + + @pytest.fixture def ray_start_object_store_memory(): # Start the Ray processes.