diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 38cb16768..7c9897558 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -199,6 +199,9 @@ class Monitor(object): elif client_type == PLASMA_MANAGER_CLIENT_TYPE: if db_client_id not in self.dead_plasma_managers: self.dead_plasma_managers.add(db_client_id) + # Stop tracking this plasma manager's heartbeats, since it's + # already dead. + del self.live_plasma_managers[db_client_id] def plasma_manager_heartbeat_handler(self, channel, data): """Handle a plasma manager heartbeat from Redis. diff --git a/python/ray/worker.py b/python/ray/worker.py index f020e7bdc..f6c63b1e9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -820,11 +820,18 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address): # "src/common/redis_module/ray_redis_module.cc" where it is defined. REDIS_CLIENT_TABLE_PREFIX = "CL:" client_keys = redis_client.keys("{}*".format(REDIS_CLIENT_TABLE_PREFIX)) - # Filter to clients on the same node and do some basic checking. + # Filter to live clients on the same node and do some basic checking. plasma_managers = [] local_schedulers = [] for key in client_keys: info = redis_client.hgetall(key) + + # Ignore clients that were deleted. + deleted = info[b"deleted"] + deleted = bool(int(deleted)) + if deleted: + continue + assert b"ray_client_id" in info assert b"node_ip_address" in info assert b"client_type" in info @@ -833,7 +840,7 @@ def get_address_info_from_redis_helper(redis_address, node_ip_address): plasma_managers.append(info) elif info[b"client_type"].decode("ascii") == "local_scheduler": local_schedulers.append(info) - # Make sure that we got at one plasma manager and local scheduler. + # Make sure that we got at least one plasma manager and local scheduler. assert len(plasma_managers) >= 1 assert len(local_schedulers) >= 1 # Build the address information. diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index 15050dd79..3bbfacf41 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -183,9 +183,21 @@ DBHandle *db_connect(const char *db_address, } void db_disconnect(DBHandle *db) { + /* Notify others that this client is disconnecting from Redis. If a client of + * the same type on the same node wants to reconnect again, they must + * reconnect and get assigned a different client ID. */ + redisReply *reply = + (redisReply *) redisCommand(db->sync_context, "RAY.DISCONNECT %b", + db->client.id, sizeof(db->client.id)); + CHECK(strcmp(reply->str, "OK") == 0); + freeReplyObject(reply); + + /* Clean up the Redis connection state. */ redisFree(db->sync_context); redisAsyncFree(db->context); redisAsyncFree(db->sub_context); + + /* Clean up memory. */ DBClientCacheEntry *e, *tmp; HASH_ITER(hh, db->db_client_cache, e, tmp) { free(e->addr); diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 996c4801d..50354057a 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -155,6 +155,37 @@ void LocalSchedulerState_free(LocalSchedulerState *state) { * the possibility of orphan worker processes. */ signal(SIGTERM, SIG_DFL); + /* Kill any child processes that didn't register as a worker yet. */ + pid_t *worker_pid; + for (worker_pid = (pid_t *) utarray_front(state->child_pids); + worker_pid != NULL; + worker_pid = (pid_t *) utarray_next(state->child_pids, worker_pid)) { + kill(*worker_pid, SIGKILL); + waitpid(*worker_pid, NULL, 0); + LOG_DEBUG("Killed pid %d", *worker_pid); + } + utarray_free(state->child_pids); + + /* Kill any registered workers. */ + /* TODO(swang): It's possible that the local scheduler will exit before all + * of its task table updates make it to redis. */ + for (LocalSchedulerClient **worker = + (LocalSchedulerClient **) utarray_front(state->workers); + worker != NULL; + worker = (LocalSchedulerClient **) utarray_front(state->workers)) { + kill_worker(*worker, true); + } + + /* Disconnect from plasma. */ + plasma_disconnect(state->plasma_conn); + state->plasma_conn = NULL; + + /* Disconnect from the database. */ + if (state->db != NULL) { + db_disconnect(state->db); + state->db = NULL; + } + /* Free the command for starting new workers. */ if (state->config.start_worker_command != NULL) { int i = 0; @@ -168,39 +199,11 @@ void LocalSchedulerState_free(LocalSchedulerState *state) { state->config.start_worker_command = NULL; } - /* Kill any child processes that didn't register as a worker yet. */ - pid_t *worker_pid; - for (worker_pid = (pid_t *) utarray_front(state->child_pids); - worker_pid != NULL; - worker_pid = (pid_t *) utarray_next(state->child_pids, worker_pid)) { - kill(*worker_pid, SIGKILL); - waitpid(*worker_pid, NULL, 0); - LOG_DEBUG("Killed pid %d", *worker_pid); - } - utarray_free(state->child_pids); - /* Free the list of workers and any tasks that are still in progress on those * workers. */ - /* TODO(swang): It's possible that the local scheduler will exit before all - * of its task table updates make it to redis. */ - for (LocalSchedulerClient **worker = - (LocalSchedulerClient **) utarray_front(state->workers); - worker != NULL; - worker = (LocalSchedulerClient **) utarray_front(state->workers)) { - kill_worker(*worker, true); - } utarray_free(state->workers); state->workers = NULL; - /* Disconnect from the database. */ - if (state->db != NULL) { - db_disconnect(state->db); - state->db = NULL; - } - /* Disconnect from plasma. */ - plasma_disconnect(state->plasma_conn); - state->plasma_conn = NULL; - /* Free the mapping from the actor ID to the ID of the local scheduler * responsible for that actor. */ actor_map_entry *current_actor_map_entry, *temp_actor_map_entry; @@ -825,7 +828,7 @@ void new_client_connection(event_loop *loop, /* We need this code so we can clean up when we get a SIGTERM signal. */ -LocalSchedulerState *g_state; +LocalSchedulerState *g_state = NULL; void signal_handler(int signal) { LOG_DEBUG("Signal was %d", signal); @@ -834,7 +837,9 @@ void signal_handler(int signal) { * free the local scheduler state at most once. If another SIGTERM is * caught during this call, there is the possibility of orphan worker * processes. */ - LocalSchedulerState_free(g_state); + if (g_state) { + LocalSchedulerState_free(g_state); + } exit(0); } } diff --git a/src/plasma/plasma_client.cc b/src/plasma/plasma_client.cc index 8fc72fecd..7c196ef45 100644 --- a/src/plasma/plasma_client.cc +++ b/src/plasma/plasma_client.cc @@ -650,27 +650,27 @@ PlasmaConnection *plasma_connect(const char *store_socket_name, } void plasma_disconnect(PlasmaConnection *conn) { - /* Perform the pending release calls to flush out the queue so that the counts - * in the objects_in_use table are accurate. */ + /* Clean up state for objects and memory pages in use. NOTE: We purposefully + * do not finish sending release calls for objects in use, so that we don't + * duplicate plasma_release calls (when handling a SIGTERM, for example). */ pending_release *element, *temp; DL_FOREACH_SAFE(conn->release_history, element, temp) { - plasma_perform_release(conn, element->object_id); DL_DELETE(conn->release_history, element); free(element); } - /* Loop over the objects in use table and release all remaining objects. */ object_in_use_entry *current_entry, *temp_entry; HASH_ITER(hh, conn->objects_in_use, current_entry, temp_entry) { - ObjectID object_id_to_release = current_entry->object_id; - int count = current_entry->count; - for (int i = 0; i < count; ++i) { - plasma_perform_release(conn, object_id_to_release); - } + HASH_DELETE(hh, conn->objects_in_use, current_entry); + free(current_entry); + } + client_mmap_table_entry *mmap_entry, *temp_mmap_entry; + HASH_ITER(hh, conn->mmap_table, mmap_entry, temp_mmap_entry) { + HASH_DELETE(hh, conn->mmap_table, mmap_entry); + free(mmap_entry); } - /* Check that we've successfully released everything. */ - CHECKM(conn->in_use_object_bytes == 0, "conn->in_use_object_bytes = %" PRId64, - conn->in_use_object_bytes); free_protocol_builder(conn->builder); + /* Close the connections to Plasma. The Plasma store will release the objects + * that were in use by us when handling the SIGPIPE. */ close(conn->store_conn); if (conn->manager_conn >= 0) { close(conn->manager_conn); diff --git a/src/plasma/plasma_manager.cc b/src/plasma/plasma_manager.cc index 12f306c2b..2f7c89dbd 100644 --- a/src/plasma/plasma_manager.cc +++ b/src/plasma/plasma_manager.cc @@ -531,6 +531,14 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name, } void PlasmaManagerState_free(PlasmaManagerState *state) { + /* Reset the SIGTERM handler to default behavior, so we try to clean up the + * plasma manager at most once. */ + signal(SIGTERM, SIG_DFL); + if (state->db != NULL) { + db_disconnect(state->db); + state->db = NULL; + } + ClientConnection *manager_conn, *tmp_manager_conn; HASH_ITER(manager_hh, state->manager_connections, manager_conn, tmp_manager_conn) { @@ -550,6 +558,18 @@ void PlasmaManagerState_free(PlasmaManagerState *state) { free(entry); } + ObjectWaitRequests *wait_reqs, *tmp_wait_reqs; + HASH_ITER(hh, state->object_wait_requests_local, wait_reqs, tmp_wait_reqs) { + HASH_DELETE(hh, state->object_wait_requests_local, wait_reqs); + utarray_free(wait_reqs->wait_requests); + free(wait_reqs); + } + HASH_ITER(hh, state->object_wait_requests_remote, wait_reqs, tmp_wait_reqs) { + HASH_DELETE(hh, state->object_wait_requests_remote, wait_reqs); + utarray_free(wait_reqs->wait_requests); + free(wait_reqs); + } + plasma_disconnect(state->plasma_conn); event_loop_destroy(state->loop); free_protocol_builder(state->builder); @@ -1605,9 +1625,10 @@ void start_server(const char *store_socket_name, /* Report "success" to valgrind. */ void signal_handler(int signal) { + LOG_DEBUG("Signal was %d", signal); if (signal == SIGTERM) { if (g_manager_state) { - db_disconnect(g_manager_state->db); + PlasmaManagerState_free(g_manager_state); } exit(0); }