Suppress errors when worker or driver intentionally disconnects. (#2935)

This commit is contained in:
Richard Liaw
2018-10-04 00:06:35 -07:00
committed by Robert Nishihara
parent f2dbd3096c
commit 01bb073569
6 changed files with 56 additions and 17 deletions
@@ -17,9 +17,12 @@ enum MessageType:int {
// Send a reply confirming the successful registration of a worker or driver.
// This is sent from the local scheduler to a worker or driver.
RegisterClientReply,
// Notify the local scheduler that this client is disconnecting gracefully.
// Notify the local scheduler that this client disconnected unexpectedly.
// This is sent from a worker to a local scheduler.
DisconnectClient,
// Notify the local scheduler that this client is disconnecting gracefully.
// This is sent from a worker to a local scheduler.
IntentionalDisconnectClient,
// Get a new task from the local scheduler. This is sent from a worker to a
// local scheduler.
GetTask,
@@ -56,8 +56,15 @@ void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::local_scheduler::protocol::CreateDisconnectClient(fbb);
fbb.Finish(message);
write_message(conn->conn, static_cast<int64_t>(MessageType::DisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
if (conn->use_raylet) {
write_message(conn->conn, static_cast<int64_t>(
MessageType::IntentionalDisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
} else {
write_message(conn->conn,
static_cast<int64_t>(MessageType::DisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
}
void local_scheduler_log_event(LocalSchedulerConnection *conn,
+5 -2
View File
@@ -23,9 +23,12 @@ enum MessageType:int {
// Send a reply confirming the successful registration of a worker or driver.
// This is sent from the local scheduler to a worker or driver.
RegisterClientReply,
// Notify the local scheduler that this client is disconnecting gracefully.
// Notify the local scheduler that this client is disconnecting unexpectedly.
// This is sent from a worker to a local scheduler.
DisconnectClient,
// Notify the local scheduler that this client is disconnecting gracefully.
// This is sent from a worker to a local scheduler.
IntentionalDisconnectClient,
// Get a new task from the local scheduler. This is sent from a worker to a
// local scheduler.
GetTask,
@@ -183,7 +186,7 @@ table PushErrorRequest {
}
table FreeObjectsRequest {
// Whether keep this request with local object store
// Whether keep this request with local object store
// or send it to all the object stores.
local_only: bool;
// List of object ids we'll delete from object store.
+23 -11
View File
@@ -26,6 +26,8 @@ RAY_CHECK_ENUM(protocol::MessageType::RegisterClientReply,
local_scheduler_protocol::MessageType::RegisterClientReply);
RAY_CHECK_ENUM(protocol::MessageType::DisconnectClient,
local_scheduler_protocol::MessageType::DisconnectClient);
RAY_CHECK_ENUM(protocol::MessageType::IntentionalDisconnectClient,
local_scheduler_protocol::MessageType::IntentionalDisconnectClient);
RAY_CHECK_ENUM(protocol::MessageType::GetTask,
local_scheduler_protocol::MessageType::GetTask);
RAY_CHECK_ENUM(protocol::MessageType::ExecuteTask,
@@ -539,18 +541,19 @@ void NodeManager::ProcessClientMessage(
RAY_LOG(DEBUG) << "Message of type " << message_type;
auto registered_worker = worker_pool_.GetRegisteredWorker(client);
auto message_type_value = static_cast<protocol::MessageType>(message_type);
if (registered_worker && registered_worker->IsDead()) {
// For a worker that is marked as dead (because the driver has died already),
// all the messages are ignored except DisconnectClient.
if (static_cast<protocol::MessageType>(message_type) !=
protocol::MessageType::DisconnectClient) {
if ((message_type_value != protocol::MessageType::DisconnectClient) &&
(message_type_value != protocol::MessageType::IntentionalDisconnectClient)) {
// Listen for more messages.
client->ProcessMessages();
return;
}
}
switch (static_cast<protocol::MessageType>(message_type)) {
switch (message_type_value) {
case protocol::MessageType::RegisterClientRequest: {
ProcessRegisterClientRequestMessage(client, message_data);
} break;
@@ -563,6 +566,12 @@ void NodeManager::ProcessClientMessage(
// because it's already disconnected.
return;
} break;
case protocol::MessageType::IntentionalDisconnectClient: {
ProcessDisconnectClientMessage(client, /* push_warning = */ false);
// We don't need to receive future messages from this client,
// because it's already disconnected.
return;
} break;
case protocol::MessageType::SubmitTask: {
ProcessSubmitTaskMessage(message_data);
} break;
@@ -638,7 +647,7 @@ void NodeManager::ProcessGetTaskMessage(
}
void NodeManager::ProcessDisconnectClientMessage(
const std::shared_ptr<LocalClientConnection> &client) {
const std::shared_ptr<LocalClientConnection> &client, bool push_warning) {
const std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
const std::shared_ptr<Worker> driver = worker_pool_.GetRegisteredDriver(client);
// This client can't be a worker and a driver.
@@ -678,13 +687,16 @@ void NodeManager::ProcessDisconnectClientMessage(
TreatTaskAsFailed(spec);
const JobID &job_id = worker->GetAssignedDriverId();
// TODO(rkn): Define this constant somewhere else.
std::string type = "worker_died";
std::ostringstream error_message;
error_message << "A worker died or was killed while executing task " << task_id
<< ".";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
job_id, type, error_message.str(), current_time_ms()));
if (push_warning) {
// TODO(rkn): Define this constant somewhere else.
std::string type = "worker_died";
std::ostringstream error_message;
error_message << "A worker died or was killed while executing task " << task_id
<< ".";
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
job_id, type, error_message.str(), current_time_ms()));
}
}
worker_pool_.DisconnectWorker(worker);
+2 -1
View File
@@ -286,9 +286,10 @@ class NodeManager {
/// client.
///
/// \param client The client that sent the message.
/// \param push_warning Propogate error message if true.
/// \return Void.
void ProcessDisconnectClientMessage(
const std::shared_ptr<LocalClientConnection> &client);
const std::shared_ptr<LocalClientConnection> &client, bool push_warning = true);
/// Process client message of SubmitTask
///
+13
View File
@@ -371,6 +371,19 @@ def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
ray.get(task2)
def test_actor_scope_or_intentionally_killed_message(ray_start_regular):
@ray.remote
class Actor(object):
pass
a = Actor.remote()
a = Actor.remote()
a.__ray_terminate__.remote()
time.sleep(1)
assert len(ray.error_info()) == 0, (
"Should not have propogated an error - {}".format(ray.error_info()))
@pytest.fixture
def ray_start_object_store_memory():
# Start the Ray processes.