From a4d8e1309401442bfe5f3930b83a06fa4108e3a2 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Thu, 1 Jun 2017 13:10:40 -0700 Subject: [PATCH] Suppress excess warning messages related to intentional actor deaths. (#627) * Don't submit the actor destructor tasks when the job is exiting. * Don't propagate error messages to the driver when an actor exits intentionally. --- python/ray/actor.py | 6 ++++-- src/local_scheduler/format/local_scheduler.fbs | 6 ++++++ src/local_scheduler/local_scheduler.cc | 8 +++++++- src/local_scheduler/local_scheduler_client.cc | 8 ++++++++ src/local_scheduler/local_scheduler_client.h | 10 ++++++++++ src/local_scheduler/local_scheduler_extension.cc | 8 ++++++++ src/local_scheduler/local_scheduler_shared.h | 4 ++++ 7 files changed, 47 insertions(+), 3 deletions(-) diff --git a/python/ray/actor.py b/python/ray/actor.py index b20da5f05..5e2410d64 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -279,6 +279,7 @@ def make_actor(Class, num_cpus, num_gpus): # terminating the worker. class Class(Class): def __ray_terminate__(self): + ray.worker.global_worker.local_scheduler_client.disconnect() import os os._exit(0) @@ -398,8 +399,9 @@ def make_actor(Class, num_cpus, num_gpus): def __del__(self): """Kill the worker that is running this actor.""" - actor_method_call(self._ray_actor_id, "__ray_terminate__", - self._ray_method_signatures["__ray_terminate__"]) + if ray.worker.global_worker.connected: + actor_method_call(self._ray_actor_id, "__ray_terminate__", + self._ray_method_signatures["__ray_terminate__"]) return NewClass diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index bb7a49fec..c0cba6ceb 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -16,6 +16,9 @@ enum MessageType:int { // Send a reply confirming the successful registration of a worker or driver. // This is sent from the local scheduler to a worker or driver. RegisterClientReply, + // Notify the local scheduler that this client is disconnecting gracefully. + // This is sent from a worker to a local scheduler. + DisconnectClient, // Get a new task from the local scheduler. This is sent from a worker to a // local scheduler. GetTask, @@ -66,6 +69,9 @@ table RegisterClientReply { gpu_ids: [int]; } +table DisconnectClient { +} + table ReconstructObject { // Object ID of the object that needs to be reconstructed. object_id: string; diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 323994c5f..1504a0df5 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -833,7 +833,8 @@ void handle_client_disconnect(LocalSchedulerState *state, /* In this case, a driver is disconecting. */ driver_table_send_driver_death(state->db, worker->client_id, NULL); } - kill_worker(state, worker, false, false); + /* Suppress the warning message if the worker already disconnected. */ + kill_worker(state, worker, false, worker->disconnected); } void process_message(event_loop *loop, @@ -872,6 +873,10 @@ void process_message(event_loop *loop, } break; case MessageType_TaskDone: { } break; + case MessageType_DisconnectClient: { + CHECK(!worker->disconnected); + worker->disconnected = true; + } break; case MessageType_EventLogMessage: { /* Parse the message. */ auto message = flatbuffers::GetRoot( @@ -997,6 +1002,7 @@ void new_client_connection(event_loop *loop, LocalSchedulerClient *worker = new LocalSchedulerClient(); worker->sock = new_socket; worker->registered = false; + worker->disconnected = false; /* We don't know whether this is a worker or not, so just initialize is_worker * to false. */ worker->is_worker = true; diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 16e99bf16..31fb2eed9 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -61,6 +61,14 @@ void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) { delete conn; } +void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) { + flatbuffers::FlatBufferBuilder fbb; + auto message = CreateDisconnectClient(fbb); + fbb.Finish(message); + write_message(conn->conn, MessageType_DisconnectClient, fbb.GetSize(), + fbb.GetBufferPointer()); +} + void local_scheduler_log_event(LocalSchedulerConnection *conn, uint8_t *key, int64_t key_length, diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index 80c60bf57..9c218c469 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -55,6 +55,16 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, TaskSpec *task, int64_t task_size); +/** + * Notify the local scheduler that this client is disconnecting gracefully. This + * is used by actors to exit gracefully so that the local scheduler doesn't + * propagate an error message to the driver. + * + * @param conn The connection information. + * @return Void. + */ +void local_scheduler_disconnect_client(LocalSchedulerConnection *conn); + /** * Log an event to the event log. This will call RPUSH key value. We use RPUSH * instead of SET so that it is possible to flush the log multiple times with diff --git a/src/local_scheduler/local_scheduler_extension.cc b/src/local_scheduler/local_scheduler_extension.cc index 3c4fe620a..4230d0e17 100644 --- a/src/local_scheduler/local_scheduler_extension.cc +++ b/src/local_scheduler/local_scheduler_extension.cc @@ -41,6 +41,12 @@ static void PyLocalSchedulerClient_dealloc(PyLocalSchedulerClient *self) { Py_TYPE(self)->tp_free((PyObject *) self); } +static PyObject *PyLocalSchedulerClient_disconnect(PyObject *self) { + local_scheduler_disconnect_client( + ((PyLocalSchedulerClient *) self)->local_scheduler_connection); + Py_RETURN_NONE; +} + static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) { PyObject *py_task; if (!PyArg_ParseTuple(args, "O", &py_task)) { @@ -127,6 +133,8 @@ static PyObject *PyLocalSchedulerClient_gpu_ids(PyObject *self) { } static PyMethodDef PyLocalSchedulerClient_methods[] = { + {"disconnect", (PyCFunction) PyLocalSchedulerClient_disconnect, METH_NOARGS, + "Notify the local scheduler that this client is exiting gracefully."}, {"submit", (PyCFunction) PyLocalSchedulerClient_submit, METH_VARARGS, "Submit a task to the local scheduler."}, {"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_NOARGS, diff --git a/src/local_scheduler/local_scheduler_shared.h b/src/local_scheduler/local_scheduler_shared.h index b10cff72c..94627b7ab 100644 --- a/src/local_scheduler/local_scheduler_shared.h +++ b/src/local_scheduler/local_scheduler_shared.h @@ -86,6 +86,10 @@ struct LocalSchedulerClient { int sock; /** True if the client has registered and false otherwise. */ bool registered; + /** True if the client has sent a disconnect message to the local scheduler + * and false otherwise. If this is true, then the local scheduler will not + * propagate an error message to the driver when the client exits. */ + bool disconnected; /** True if the client is a worker and false if it is a driver. */ bool is_worker; /** The worker ID if the client is a worker and the driver ID if the client is