From 303d0fed3ee41ad2a2b99e40ec72002c4f6c0963 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 17 Jan 2017 20:34:31 -0800 Subject: [PATCH] Prevent plasma store and manager from dying when a client dies. (#203) * Prevent plasma store and manager from dying when a worker dies. * Check errno inside of warn_if_sigpipe. Passing in errno doesn't work because the arguments to warn_if_sigpipe can be evaluated out of order. --- .travis.yml | 1 + python/ray/services.py | 124 ++++++++++++++---------- src/global_scheduler/global_scheduler.c | 2 +- src/photon/photon_scheduler.c | 17 +++- src/plasma/plasma.c | 14 +++ src/plasma/plasma.h | 17 ++++ src/plasma/plasma_manager.c | 43 +++++--- src/plasma/plasma_store.c | 110 +++++++++++++++------ test/component_failures_test.py | 68 +++++++++++++ 9 files changed, 299 insertions(+), 97 deletions(-) create mode 100644 test/component_failures_test.py diff --git a/.travis.yml b/.travis.yml index cd406694b..aae302fce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -76,3 +76,4 @@ script: - python test/failure_test.py - python test/microbenchmarks.py - python test/stress_tests.py + - python test/component_failures_test.py diff --git a/python/ray/services.py b/python/ray/services.py index ec76eb8a0..878afea9b 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -12,17 +12,31 @@ import string import subprocess import sys import time -from collections import namedtuple +from collections import namedtuple, OrderedDict # Ray modules import photon import plasma import global_scheduler -# all_processes is a list of the scheduler, object store, and worker processes -# that have been started by this services module if Ray is being used in local -# mode. -all_processes = [] +PROCESS_TYPE_WORKER = "worker" +PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler" +PROCESS_TYPE_PLASMA_MANAGER = "plasma_manager" +PROCESS_TYPE_PLASMA_STORE = "plasma_store" +PROCESS_TYPE_GLOBAL_SCHEDULER = "global_scheduler" +PROCESS_TYPE_REDIS_SERVER = "redis_server" + +# This is a dictionary tracking all of the processes of different types that +# have been started by this services module. Note that the order of the keys is +# important because it determines the order in which these processes will be +# terminated when Ray exits, and certain orders will cause errors to be logged +# to the screen. +all_processes = OrderedDict([(PROCESS_TYPE_WORKER, []), + (PROCESS_TYPE_LOCAL_SCHEDULER, []), + (PROCESS_TYPE_PLASMA_MANAGER, []), + (PROCESS_TYPE_PLASMA_STORE, []), + (PROCESS_TYPE_GLOBAL_SCHEDULER, []), + (PROCESS_TYPE_REDIS_SERVER, [])]) # True if processes are run in the valgrind profiler. RUN_PHOTON_PROFILER = False @@ -54,6 +68,35 @@ def new_port(): def random_name(): return str(random.randint(0, 99999999)) +def kill_process(p): + """Kill a process. + + Args: + p: The process to kill. + + Returns: + True if the process was killed successfully and false otherwise. + """ + if p.poll() is not None: # process has already terminated + return True + if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: + os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data. + time.sleep(0.1) # Wait for profiling data to be written. + p.kill() + # Sleeping for 0 should yield the core and allow the killed process to process + # its pending signals. + time.sleep(0) + if p.poll() is not None: + return True + p.terminate() + # Sleeping for 0 should yield the core and allow the killed process to process + # its pending signals. + time.sleep(0) + if p.poll is not None: + return True + # The process was not killed for some reason. + return False + def cleanup(): """When running in local mode, shutdown the Ray processes. @@ -62,33 +105,33 @@ def cleanup(): processes that were started by this services module. Driver processes are started and disconnected by worker.py. """ - global all_processes successfully_shut_down = True # Terminate the processes in reverse order. - for p in all_processes[::-1]: - if p.poll() is not None: # process has already terminated - continue - if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: - os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data. - time.sleep(0.1) # Wait for profiling data to be written. - p.kill() - time.sleep(0.05) # is this necessary? - if p.poll() is not None: - continue - p.terminate() - time.sleep(0.05) # is this necessary? - if p.poll is not None: - continue - successfully_shut_down = False + for process_type in all_processes.keys(): + # Kill all of the processes of a certain type. + for p in all_processes[process_type]: + success = kill_process(p) + successfully_shut_down = successfully_shut_down and success + # Reset the list of processes of this type. + all_processes[process_type] = [] if successfully_shut_down: if len(all_processes) > 0: print("Successfully shut down Ray.") else: print("Ray did not shut down properly.") - all_processes = [] -def all_processes_alive(): - return all([p.poll() is None for p in all_processes]) +def all_processes_alive(exclude=[]): + """Check if all of the processes are still alive. + + Args: + exclude: Don't check the processes whose types are in this list. + """ + for process_type, processes in all_processes.items(): + # Note that p.poll() returns the exit code that the process exited with, so + # an exit code of None indicates that the process is still alive. + if not all([p.poll() is None for p in processes]) and process_type not in exclude: + return False + return True def get_node_ip_address(address="8.8.8.8:53"): """Determine the IP address of the local node. @@ -172,7 +215,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F # not exit within 0.1 seconds). if p.poll() is None: if cleanup: - all_processes.append(p) + all_processes[PROCESS_TYPE_REDIS_SERVER].append(p) break counter += 1 if counter == num_retries: @@ -204,7 +247,7 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): """ p = global_scheduler.start_global_scheduler(redis_address, redirect_output=redirect_output) if cleanup: - all_processes.append(p) + all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p) def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True, redirect_output=False): """Start a local scheduler process. @@ -227,7 +270,7 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, pla """ local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, redirect_output=redirect_output) if cleanup: - all_processes.append(p) + all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) return local_scheduler_name def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output=False, objstore_memory=None): @@ -273,8 +316,8 @@ def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output # Start the plasma manager. plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output) if cleanup: - all_processes.append(p1) - all_processes.append(p2) + all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1) + all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2) return ObjectStoreAddress(plasma_store_name, plasma_manager_name, plasma_manager_port) @@ -309,28 +352,7 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name, stderr = FNULL if redirect_output else None p = subprocess.Popen(command, stdout=stdout, stderr=stderr) if cleanup: - all_processes.append(p) - -def start_webui(redis_port, cleanup=True, redirect_output=False): - """This method starts the web interface. - - Args: - redis_port (int): The redis server's port - cleanup (bool): True if using Ray in local mode. If cleanup is true, then - this process will be killed by services.cleanup() when the Python process - that imported services exits. This is True by default. - redirect_output (bool): True if stdout and stderr should be redirected to - /dev/null. - """ - executable = "nodejs" if sys.platform == "linux" or sys.platform == "linux2" else "node" - command = [executable, os.path.join(os.path.abspath(os.path.dirname(__file__)), "../webui/index.js"), str(redis_port)] - with open("/tmp/webui_out.txt", "wb") as out: - with open(os.devnull, "w") as FNULL: - stdout = FNULL if redirect_output else out - stderr = FNULL if redirect_output else None - p = subprocess.Popen(command, stdout=stdout, stderr=stderr) - if cleanup: - all_processes.append(p) + all_processes[PROCESS_TYPE_WORKER].append(p) def start_ray_processes(address_info=None, node_ip_address="127.0.0.1", diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c index d3fdb5984..c566334e4 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.c @@ -237,7 +237,7 @@ void local_scheduler_table_handler(db_client_id client_id, UNUSED(id_string); LOG_DEBUG( "total workers = %d, task queue length = %d, available workers = %d", - info.num_total_workers, info.task_queue_length, info.available_workers); + info.total_num_workers, info.task_queue_length, info.available_workers); /* Update the local scheduler info struct. */ local_scheduler *local_scheduler_ptr = get_local_scheduler(state, client_id); if (local_scheduler_ptr != NULL) { diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 67b9ba1c1..5edf076b7 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -114,7 +114,19 @@ void assign_task_to_worker(local_scheduler_state *state, bool from_global_scheduler) { CHECK(worker_index < utarray_len(state->workers)); worker *w = (worker *) utarray_eltptr(state->workers, worker_index); - write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), (uint8_t *) spec); + if (write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), + (uint8_t *) spec) < 0) { + if (errno == EPIPE || errno == EBADF) { + /* TODO(rkn): If this happens, the task should be added back to the task + * queue. */ + LOG_WARN( + "Failed to give task to worker on fd %d. The client may have hung " + "up.", + w->sock); + } else { + LOG_FATAL("Failed to give task to client on fd %d.", w->sock); + } + } /* Update the global task table. */ if (state->db != NULL) { task *task = @@ -350,6 +362,9 @@ void start_server(const char *node_ip_address, const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists) { + /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write + * to a client that has already died, the local scheduler could die. */ + signal(SIGPIPE, SIG_IGN); int fd = bind_ipc_sock(socket_name, true); event_loop *loop = event_loop_create(); g_state = init_local_scheduler( diff --git a/src/plasma/plasma.c b/src/plasma/plasma.c index 4e3eb0c3b..2427284e9 100644 --- a/src/plasma/plasma.c +++ b/src/plasma/plasma.c @@ -17,3 +17,17 @@ bool plasma_object_ids_distinct(int num_object_ids, object_id object_ids[]) { } return true; } + +void warn_if_sigpipe(int status, int client_sock) { + if (status >= 0) { + return; + } + if (errno == EPIPE || errno == EBADF) { + LOG_WARN( + "Received SIGPIPE or BAD FILE DESCRIPTOR when sending a message to " + "client on fd %d. The client on the other end may have hung up.", + client_sock); + return; + } + LOG_FATAL("Failed to write message to client on fd %d.", client_sock); +} diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 0ef7b2084..4df29b8ad 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -135,4 +135,21 @@ typedef struct { */ bool plasma_object_ids_distinct(int num_object_ids, object_id object_ids[]); +/** + * Print a warning if the status is less than zero. This should be used to check + * the success of messages sent to plasma clients. We print a warning instead of + * failing because the plasma clients are allowed to die. This is used to handle + * situations where the store writes to a client file descriptor, and the client + * may already have disconnected. If we have processed the disconnection and + * closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we + * have not, then we should get a SIGPIPE. + * + * @param status The status to check. If it is less less than zero, we will + * print a warning. + * @param client_sock The client socket. This is just used to print some extra + * information. + * @return Void. + */ +void warn_if_sigpipe(int status, int client_sock); + #endif /* PLASMA_H */ diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 277944a71..0ef839adf 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -357,9 +357,10 @@ void remove_wait_request(plasma_manager_state *manager_state, void return_from_wait(plasma_manager_state *manager_state, wait_request *wait_req) { /* Send the reply to the client. */ - CHECK(plasma_send_WaitReply(wait_req->client_conn->fd, manager_state->builder, - wait_req->object_requests, - wait_req->num_object_requests) >= 0); + warn_if_sigpipe(plasma_send_WaitReply( + wait_req->client_conn->fd, manager_state->builder, + wait_req->object_requests, wait_req->num_object_requests), + wait_req->client_conn->fd); /* Remove the wait request from each of the relevant object_wait_requests hash * tables if it is present there. */ for (int i = 0; i < wait_req->num_object_requests; ++i) { @@ -580,16 +581,20 @@ void send_queued_request(event_loop *loop, plasma_request_buffer *buf = conn->transfer_queue; switch (buf->type) { case MessageType_PlasmaDataRequest: - CHECK(plasma_send_DataRequest(conn->fd, state->builder, buf->object_id, - state->addr, state->port) >= 0); + warn_if_sigpipe( + plasma_send_DataRequest(conn->fd, state->builder, buf->object_id, + state->addr, state->port), + conn->fd); break; case MessageType_PlasmaDataReply: LOG_DEBUG("Transferring object to manager"); if (conn->cursor == 0) { /* If the cursor is zero, we haven't sent any requests for this object * yet, so send the initial data request. */ - CHECK(plasma_send_DataReply(conn->fd, state->builder, buf->object_id, - buf->data_size, buf->metadata_size) >= 0); + warn_if_sigpipe( + plasma_send_DataReply(conn->fd, state->builder, buf->object_id, + buf->data_size, buf->metadata_size), + conn->fd); } write_object_chunk(conn, buf); break; @@ -1169,9 +1174,10 @@ void request_status_done(object_id object_id, client_connection *client_conn = (client_connection *) context; int status = request_status(object_id, manager_count, manager_vector, context); - CHECK(plasma_send_StatusReply(client_conn->fd, - client_conn->manager_state->builder, &object_id, - &status, 1) >= 0); + warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd, + client_conn->manager_state->builder, + &object_id, &status, 1), + client_conn->fd); } int request_status(object_id object_id, @@ -1204,17 +1210,19 @@ void process_status_request(client_connection *client_conn, /* Return success immediately if we already have this object. */ if (is_object_local(client_conn->manager_state, object_id)) { int status = ObjectStatus_Local; - CHECK(plasma_send_StatusReply(client_conn->fd, - client_conn->manager_state->builder, - &object_id, &status, 1) >= 0); + warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd, + client_conn->manager_state->builder, + &object_id, &status, 1), + client_conn->fd); return; } if (client_conn->manager_state->db == NULL) { int status = ObjectStatus_Nonexistent; - CHECK(plasma_send_StatusReply(client_conn->fd, - client_conn->manager_state->builder, - &object_id, &status, 1) >= 0); + warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd, + client_conn->manager_state->builder, + &object_id, &status, 1), + client_conn->fd); return; } @@ -1427,6 +1435,9 @@ void start_server(const char *store_socket_name, int port, const char *db_addr, int db_port) { + /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write + * to a client that has already died, the manager could die. */ + signal(SIGPIPE, SIG_IGN); /* Bind the sockets before we try to connect to the plasma store. * In case the bind does not succeed, we want to be able to exit * without breaking the pipe to the store. */ diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index d9c034e5a..22bb5854c 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -353,9 +353,34 @@ void seal_object(client *client_context, /* Send notifications to the clients that were waiting for this object. */ for (int i = 0; i < utarray_len(notify_entry->waiting_clients); ++i) { client **c = (client **) utarray_eltptr(notify_entry->waiting_clients, i); - CHECK(plasma_send_GetReply((*c)->sock, plasma_state->builder, &object_id, - &object, 1) >= 0); - CHECK(send_fd((*c)->sock, object.handle.store_fd) >= 0); + int status; + /* Send the get reply to the client. Handle errors on the write. If the + * client has hung up, that's ok. */ + if (plasma_send_GetReply((*c)->sock, plasma_state->builder, &object_id, + &object, 1) < 0) { + if (errno == EPIPE || errno == EBADF) { + LOG_WARN( + "Failed to send a message to client on fd %d. The client may " + "have hung up.", + (*c)->sock); + continue; + } else { + LOG_FATAL("Failed to send a message to client on fd %d.", (*c)->sock); + } + } + /* Send the object's file descriptor to the client. Handle errors on the + * write. If the client has hung up, that's ok. */ + if (send_fd((*c)->sock, object.handle.store_fd) < 0) { + if (errno == EPIPE || errno == EBADF) { + LOG_WARN( + "Failed to send a message to client on fd %d. The client may " + "have hung up.", + (*c)->sock); + continue; + } else { + LOG_FATAL("Failed to send a message to client on fd %d.", (*c)->sock); + } + } /* Record that the client is using this object. */ add_client_to_object_clients(entry, *c); } @@ -458,7 +483,7 @@ void send_notifications(event_loop *loop, send_notifications, plasma_state); break; } else { - CHECKM(0, "This code should be unreachable."); + LOG_WARN("Failed to send notification to client on fd %d", client_sock); } num_processed += 1; } @@ -474,8 +499,14 @@ void send_notifications(event_loop *loop, void subscribe_to_updates(client *client_context, int conn) { LOG_DEBUG("subscribing to updates"); plasma_store_state *plasma_state = client_context->plasma_state; + /* TODO(rkn): The store could block here if the client doesn't send a file + * descriptor. */ int fd = recv_fd(conn); - CHECK(fd >= 0); + if (fd < 0) { + /* This may mean that the client died before sending the file descriptor. */ + LOG_WARN("Failed to receive file descriptor from client on fd %d.", conn); + return; + } CHECKM(HASH_CNT(handle, plasma_state->plasma_store_info->objects) == 0, "plasma_subscribe should be called before any objects are created."); /* Create a new array to buffer notifications that can't be sent to the @@ -515,19 +546,24 @@ void process_message(event_loop *loop, &metadata_size); int error_code = create_object(client_context, object_ids[0], data_size, metadata_size, &objects[0]); - CHECK(plasma_send_CreateReply(client_sock, state->builder, object_ids[0], - &objects[0], error_code) >= 0); + warn_if_sigpipe( + plasma_send_CreateReply(client_sock, state->builder, object_ids[0], + &objects[0], error_code), + client_sock); if (error_code == PlasmaError_OK) { - CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0); + warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd), + client_sock); } } break; case MessageType_PlasmaGetRequest: { plasma_read_GetRequest(input, object_ids, 1); if (get_object(client_context, client_sock, object_ids[0], &objects[0]) == OBJECT_FOUND) { - CHECK(plasma_send_GetReply(client_sock, state->builder, object_ids, - objects, 1) >= 0); - CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0); + warn_if_sigpipe(plasma_send_GetReply(client_sock, state->builder, + object_ids, objects, 1), + client_sock); + warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd), + client_sock); } } break; case MessageType_PlasmaGetLocalRequest: { @@ -535,13 +571,19 @@ void process_message(event_loop *loop, if (get_object_local(client_context, client_sock, object_ids[0], &objects[0]) == OBJECT_FOUND) { int has_object = 1; - CHECK(plasma_send_GetLocalReply(client_sock, state->builder, object_ids, - objects, &has_object, 1) >= 0); - CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0); + warn_if_sigpipe( + plasma_send_GetLocalReply(client_sock, state->builder, object_ids, + objects, &has_object, 1), + client_sock); + warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd), + client_sock); } else { int has_object = 0; - CHECK(plasma_send_GetLocalReply(client_sock, state->builder, object_ids, - objects, &has_object, 1) >= 0); + + warn_if_sigpipe( + plasma_send_GetLocalReply(client_sock, state->builder, object_ids, + objects, &has_object, 1), + client_sock); } } break; case MessageType_PlasmaReleaseRequest: @@ -551,11 +593,13 @@ void process_message(event_loop *loop, case MessageType_PlasmaContainsRequest: plasma_read_ContainsRequest(input, &object_ids[0]); if (contains_object(client_context, object_ids[0]) == OBJECT_FOUND) { - CHECK(plasma_send_ContainsReply(client_sock, state->builder, - object_ids[0], 1) >= 0); + warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder, + object_ids[0], 1), + client_sock); } else { - CHECK(plasma_send_ContainsReply(client_sock, state->builder, - object_ids[0], 0) >= 0); + warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder, + object_ids[0], 0), + client_sock); } break; case MessageType_PlasmaSealRequest: { @@ -575,19 +619,21 @@ void process_message(event_loop *loop, &num_objects_to_evict, &objects_to_evict); remove_objects(client_context->plasma_state, num_objects_to_evict, objects_to_evict); - CHECK(plasma_send_EvictReply(client_sock, state->builder, - num_bytes_evicted) >= 0); + warn_if_sigpipe( + plasma_send_EvictReply(client_sock, state->builder, num_bytes_evicted), + client_sock); } break; case MessageType_PlasmaSubscribeRequest: subscribe_to_updates(client_context, client_sock); break; - case MessageType_PlasmaConnectRequest: - CHECK(plasma_send_ConnectReply(client_sock, state->builder, - state->plasma_store_info->memory_capacity) >= - 0); - break; + case MessageType_PlasmaConnectRequest: { + warn_if_sigpipe( + plasma_send_ConnectReply(client_sock, state->builder, + state->plasma_store_info->memory_capacity), + client_sock); + } break; case DISCONNECT_CLIENT: { - LOG_DEBUG("Disconnecting client on fd %d", client_sock); + LOG_INFO("Disconnecting client on fd %d", client_sock); event_loop_remove_file(loop, client_sock); /* If this client was using any objects, remove it from the appropriate * lists. */ @@ -597,6 +643,10 @@ void process_message(event_loop *loop, temp_entry) { remove_client_from_object_clients(entry, client_context); } + /* Note, the store may still attempt to send a message to the disconnected + * client (for example, when an object ID that the client was waiting for + * is ready). In these cases, the attempt to send the message will fail, but + * the store should just ignore the failure. */ } break; default: /* This code should be unreachable. */ @@ -629,6 +679,10 @@ void signal_handler(int signal) { } void start_server(char *socket_name, int64_t system_memory) { + /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write + * to a client that has already died, the store could die. */ + signal(SIGPIPE, SIG_IGN); + /* Create the event loop. */ event_loop *loop = event_loop_create(); plasma_store_state *state = init_plasma_store(loop, system_memory); int socket = bind_ipc_sock(socket_name, true); diff --git a/test/component_failures_test.py b/test/component_failures_test.py new file mode 100644 index 000000000..a6e385609 --- /dev/null +++ b/test/component_failures_test.py @@ -0,0 +1,68 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import ray +import sys +import time +import unittest + +class ComponentFailureTest(unittest.TestCase): + # This test checks that when a worker dies in the middle of a get, the plasma + # store and manager will not die. + def testDyingWorkerGet(self): + obj_id = 20 * b"a" + @ray.remote + def f(): + ray.worker.global_worker.plasma_client.get(obj_id) + + ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + + # Have the worker wait in a get call. + f.remote() + + # Kill the worker. + time.sleep(1) + ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER][0].terminate() + time.sleep(0.1) + + # Seal the object so the store attempts to notify the worker that the get + # has been fulfilled. + ray.worker.global_worker.plasma_client.create(obj_id, 100) + ray.worker.global_worker.plasma_client.seal(obj_id) + time.sleep(0.1) + + # Make sure that nothing has died. + self.assertTrue(ray.services.all_processes_alive(exclude=[ray.services.PROCESS_TYPE_WORKER])) + ray.worker.cleanup() + + # This test checks that when a worker dies in the middle of a wait, the plasma + # store and manager will not die. + def testDyingWorkerWait(self): + obj_id = 20 * b"a" + @ray.remote + def f(): + ray.worker.global_worker.plasma_client.wait([obj_id]) + + ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + + # Have the worker wait in a get call. + f.remote() + + # Kill the worker. + time.sleep(1) + ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER][0].terminate() + time.sleep(0.1) + + # Seal the object so the store attempts to notify the worker that the get + # has been fulfilled. + ray.worker.global_worker.plasma_client.create(obj_id, 100) + ray.worker.global_worker.plasma_client.seal(obj_id) + time.sleep(0.1) + + # Make sure that nothing has died. + self.assertTrue(ray.services.all_processes_alive(exclude=[ray.services.PROCESS_TYPE_WORKER])) + ray.worker.cleanup() + +if __name__ == "__main__": + unittest.main(verbosity=2)