mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:06:31 +08:00
Prevent plasma store and manager from dying when a client dies. (#203)
* Prevent plasma store and manager from dying when a worker dies. * Check errno inside of warn_if_sigpipe. Passing in errno doesn't work because the arguments to warn_if_sigpipe can be evaluated out of order.
This commit is contained in:
committed by
Philipp Moritz
parent
7f329db4b2
commit
303d0fed3e
@@ -76,3 +76,4 @@ script:
|
||||
- python test/failure_test.py
|
||||
- python test/microbenchmarks.py
|
||||
- python test/stress_tests.py
|
||||
- python test/component_failures_test.py
|
||||
|
||||
+73
-51
@@ -12,17 +12,31 @@ import string
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from collections import namedtuple
|
||||
from collections import namedtuple, OrderedDict
|
||||
|
||||
# Ray modules
|
||||
import photon
|
||||
import plasma
|
||||
import global_scheduler
|
||||
|
||||
# all_processes is a list of the scheduler, object store, and worker processes
|
||||
# that have been started by this services module if Ray is being used in local
|
||||
# mode.
|
||||
all_processes = []
|
||||
PROCESS_TYPE_WORKER = "worker"
|
||||
PROCESS_TYPE_LOCAL_SCHEDULER = "local_scheduler"
|
||||
PROCESS_TYPE_PLASMA_MANAGER = "plasma_manager"
|
||||
PROCESS_TYPE_PLASMA_STORE = "plasma_store"
|
||||
PROCESS_TYPE_GLOBAL_SCHEDULER = "global_scheduler"
|
||||
PROCESS_TYPE_REDIS_SERVER = "redis_server"
|
||||
|
||||
# This is a dictionary tracking all of the processes of different types that
|
||||
# have been started by this services module. Note that the order of the keys is
|
||||
# important because it determines the order in which these processes will be
|
||||
# terminated when Ray exits, and certain orders will cause errors to be logged
|
||||
# to the screen.
|
||||
all_processes = OrderedDict([(PROCESS_TYPE_WORKER, []),
|
||||
(PROCESS_TYPE_LOCAL_SCHEDULER, []),
|
||||
(PROCESS_TYPE_PLASMA_MANAGER, []),
|
||||
(PROCESS_TYPE_PLASMA_STORE, []),
|
||||
(PROCESS_TYPE_GLOBAL_SCHEDULER, []),
|
||||
(PROCESS_TYPE_REDIS_SERVER, [])])
|
||||
|
||||
# True if processes are run in the valgrind profiler.
|
||||
RUN_PHOTON_PROFILER = False
|
||||
@@ -54,6 +68,35 @@ def new_port():
|
||||
def random_name():
|
||||
return str(random.randint(0, 99999999))
|
||||
|
||||
def kill_process(p):
|
||||
"""Kill a process.
|
||||
|
||||
Args:
|
||||
p: The process to kill.
|
||||
|
||||
Returns:
|
||||
True if the process was killed successfully and false otherwise.
|
||||
"""
|
||||
if p.poll() is not None: # process has already terminated
|
||||
return True
|
||||
if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER:
|
||||
os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data.
|
||||
time.sleep(0.1) # Wait for profiling data to be written.
|
||||
p.kill()
|
||||
# Sleeping for 0 should yield the core and allow the killed process to process
|
||||
# its pending signals.
|
||||
time.sleep(0)
|
||||
if p.poll() is not None:
|
||||
return True
|
||||
p.terminate()
|
||||
# Sleeping for 0 should yield the core and allow the killed process to process
|
||||
# its pending signals.
|
||||
time.sleep(0)
|
||||
if p.poll is not None:
|
||||
return True
|
||||
# The process was not killed for some reason.
|
||||
return False
|
||||
|
||||
def cleanup():
|
||||
"""When running in local mode, shutdown the Ray processes.
|
||||
|
||||
@@ -62,33 +105,33 @@ def cleanup():
|
||||
processes that were started by this services module. Driver processes are
|
||||
started and disconnected by worker.py.
|
||||
"""
|
||||
global all_processes
|
||||
successfully_shut_down = True
|
||||
# Terminate the processes in reverse order.
|
||||
for p in all_processes[::-1]:
|
||||
if p.poll() is not None: # process has already terminated
|
||||
continue
|
||||
if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER:
|
||||
os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data.
|
||||
time.sleep(0.1) # Wait for profiling data to be written.
|
||||
p.kill()
|
||||
time.sleep(0.05) # is this necessary?
|
||||
if p.poll() is not None:
|
||||
continue
|
||||
p.terminate()
|
||||
time.sleep(0.05) # is this necessary?
|
||||
if p.poll is not None:
|
||||
continue
|
||||
successfully_shut_down = False
|
||||
for process_type in all_processes.keys():
|
||||
# Kill all of the processes of a certain type.
|
||||
for p in all_processes[process_type]:
|
||||
success = kill_process(p)
|
||||
successfully_shut_down = successfully_shut_down and success
|
||||
# Reset the list of processes of this type.
|
||||
all_processes[process_type] = []
|
||||
if successfully_shut_down:
|
||||
if len(all_processes) > 0:
|
||||
print("Successfully shut down Ray.")
|
||||
else:
|
||||
print("Ray did not shut down properly.")
|
||||
all_processes = []
|
||||
|
||||
def all_processes_alive():
|
||||
return all([p.poll() is None for p in all_processes])
|
||||
def all_processes_alive(exclude=[]):
|
||||
"""Check if all of the processes are still alive.
|
||||
|
||||
Args:
|
||||
exclude: Don't check the processes whose types are in this list.
|
||||
"""
|
||||
for process_type, processes in all_processes.items():
|
||||
# Note that p.poll() returns the exit code that the process exited with, so
|
||||
# an exit code of None indicates that the process is still alive.
|
||||
if not all([p.poll() is None for p in processes]) and process_type not in exclude:
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_node_ip_address(address="8.8.8.8:53"):
|
||||
"""Determine the IP address of the local node.
|
||||
@@ -172,7 +215,7 @@ def start_redis(node_ip_address, num_retries=20, cleanup=True, redirect_output=F
|
||||
# not exit within 0.1 seconds).
|
||||
if p.poll() is None:
|
||||
if cleanup:
|
||||
all_processes.append(p)
|
||||
all_processes[PROCESS_TYPE_REDIS_SERVER].append(p)
|
||||
break
|
||||
counter += 1
|
||||
if counter == num_retries:
|
||||
@@ -204,7 +247,7 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False):
|
||||
"""
|
||||
p = global_scheduler.start_global_scheduler(redis_address, redirect_output=redirect_output)
|
||||
if cleanup:
|
||||
all_processes.append(p)
|
||||
all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p)
|
||||
|
||||
def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_manager_name, plasma_address=None, cleanup=True, redirect_output=False):
|
||||
"""Start a local scheduler process.
|
||||
@@ -227,7 +270,7 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, pla
|
||||
"""
|
||||
local_scheduler_name, p = photon.start_local_scheduler(plasma_store_name, plasma_manager_name, node_ip_address=node_ip_address, redis_address=redis_address, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, redirect_output=redirect_output)
|
||||
if cleanup:
|
||||
all_processes.append(p)
|
||||
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
|
||||
return local_scheduler_name
|
||||
|
||||
def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output=False, objstore_memory=None):
|
||||
@@ -273,8 +316,8 @@ def start_objstore(node_ip_address, redis_address, cleanup=True, redirect_output
|
||||
# Start the plasma manager.
|
||||
plasma_manager_name, p2, plasma_manager_port = plasma.start_plasma_manager(plasma_store_name, redis_address, node_ip_address=node_ip_address, run_profiler=RUN_PLASMA_MANAGER_PROFILER, redirect_output=redirect_output)
|
||||
if cleanup:
|
||||
all_processes.append(p1)
|
||||
all_processes.append(p2)
|
||||
all_processes[PROCESS_TYPE_PLASMA_STORE].append(p1)
|
||||
all_processes[PROCESS_TYPE_PLASMA_MANAGER].append(p2)
|
||||
|
||||
return ObjectStoreAddress(plasma_store_name, plasma_manager_name,
|
||||
plasma_manager_port)
|
||||
@@ -309,28 +352,7 @@ def start_worker(node_ip_address, object_store_name, object_store_manager_name,
|
||||
stderr = FNULL if redirect_output else None
|
||||
p = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
if cleanup:
|
||||
all_processes.append(p)
|
||||
|
||||
def start_webui(redis_port, cleanup=True, redirect_output=False):
|
||||
"""This method starts the web interface.
|
||||
|
||||
Args:
|
||||
redis_port (int): The redis server's port
|
||||
cleanup (bool): True if using Ray in local mode. If cleanup is true, then
|
||||
this process will be killed by services.cleanup() when the Python process
|
||||
that imported services exits. This is True by default.
|
||||
redirect_output (bool): True if stdout and stderr should be redirected to
|
||||
/dev/null.
|
||||
"""
|
||||
executable = "nodejs" if sys.platform == "linux" or sys.platform == "linux2" else "node"
|
||||
command = [executable, os.path.join(os.path.abspath(os.path.dirname(__file__)), "../webui/index.js"), str(redis_port)]
|
||||
with open("/tmp/webui_out.txt", "wb") as out:
|
||||
with open(os.devnull, "w") as FNULL:
|
||||
stdout = FNULL if redirect_output else out
|
||||
stderr = FNULL if redirect_output else None
|
||||
p = subprocess.Popen(command, stdout=stdout, stderr=stderr)
|
||||
if cleanup:
|
||||
all_processes.append(p)
|
||||
all_processes[PROCESS_TYPE_WORKER].append(p)
|
||||
|
||||
def start_ray_processes(address_info=None,
|
||||
node_ip_address="127.0.0.1",
|
||||
|
||||
@@ -237,7 +237,7 @@ void local_scheduler_table_handler(db_client_id client_id,
|
||||
UNUSED(id_string);
|
||||
LOG_DEBUG(
|
||||
"total workers = %d, task queue length = %d, available workers = %d",
|
||||
info.num_total_workers, info.task_queue_length, info.available_workers);
|
||||
info.total_num_workers, info.task_queue_length, info.available_workers);
|
||||
/* Update the local scheduler info struct. */
|
||||
local_scheduler *local_scheduler_ptr = get_local_scheduler(state, client_id);
|
||||
if (local_scheduler_ptr != NULL) {
|
||||
|
||||
@@ -114,7 +114,19 @@ void assign_task_to_worker(local_scheduler_state *state,
|
||||
bool from_global_scheduler) {
|
||||
CHECK(worker_index < utarray_len(state->workers));
|
||||
worker *w = (worker *) utarray_eltptr(state->workers, worker_index);
|
||||
write_message(w->sock, EXECUTE_TASK, task_spec_size(spec), (uint8_t *) spec);
|
||||
if (write_message(w->sock, EXECUTE_TASK, task_spec_size(spec),
|
||||
(uint8_t *) spec) < 0) {
|
||||
if (errno == EPIPE || errno == EBADF) {
|
||||
/* TODO(rkn): If this happens, the task should be added back to the task
|
||||
* queue. */
|
||||
LOG_WARN(
|
||||
"Failed to give task to worker on fd %d. The client may have hung "
|
||||
"up.",
|
||||
w->sock);
|
||||
} else {
|
||||
LOG_FATAL("Failed to give task to client on fd %d.", w->sock);
|
||||
}
|
||||
}
|
||||
/* Update the global task table. */
|
||||
if (state->db != NULL) {
|
||||
task *task =
|
||||
@@ -350,6 +362,9 @@ void start_server(const char *node_ip_address,
|
||||
const char *plasma_manager_socket_name,
|
||||
const char *plasma_manager_address,
|
||||
bool global_scheduler_exists) {
|
||||
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
|
||||
* to a client that has already died, the local scheduler could die. */
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
int fd = bind_ipc_sock(socket_name, true);
|
||||
event_loop *loop = event_loop_create();
|
||||
g_state = init_local_scheduler(
|
||||
|
||||
@@ -17,3 +17,17 @@ bool plasma_object_ids_distinct(int num_object_ids, object_id object_ids[]) {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void warn_if_sigpipe(int status, int client_sock) {
|
||||
if (status >= 0) {
|
||||
return;
|
||||
}
|
||||
if (errno == EPIPE || errno == EBADF) {
|
||||
LOG_WARN(
|
||||
"Received SIGPIPE or BAD FILE DESCRIPTOR when sending a message to "
|
||||
"client on fd %d. The client on the other end may have hung up.",
|
||||
client_sock);
|
||||
return;
|
||||
}
|
||||
LOG_FATAL("Failed to write message to client on fd %d.", client_sock);
|
||||
}
|
||||
|
||||
@@ -135,4 +135,21 @@ typedef struct {
|
||||
*/
|
||||
bool plasma_object_ids_distinct(int num_object_ids, object_id object_ids[]);
|
||||
|
||||
/**
|
||||
* Print a warning if the status is less than zero. This should be used to check
|
||||
* the success of messages sent to plasma clients. We print a warning instead of
|
||||
* failing because the plasma clients are allowed to die. This is used to handle
|
||||
* situations where the store writes to a client file descriptor, and the client
|
||||
* may already have disconnected. If we have processed the disconnection and
|
||||
* closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
|
||||
* have not, then we should get a SIGPIPE.
|
||||
*
|
||||
* @param status The status to check. If it is less less than zero, we will
|
||||
* print a warning.
|
||||
* @param client_sock The client socket. This is just used to print some extra
|
||||
* information.
|
||||
* @return Void.
|
||||
*/
|
||||
void warn_if_sigpipe(int status, int client_sock);
|
||||
|
||||
#endif /* PLASMA_H */
|
||||
|
||||
+27
-16
@@ -357,9 +357,10 @@ void remove_wait_request(plasma_manager_state *manager_state,
|
||||
void return_from_wait(plasma_manager_state *manager_state,
|
||||
wait_request *wait_req) {
|
||||
/* Send the reply to the client. */
|
||||
CHECK(plasma_send_WaitReply(wait_req->client_conn->fd, manager_state->builder,
|
||||
wait_req->object_requests,
|
||||
wait_req->num_object_requests) >= 0);
|
||||
warn_if_sigpipe(plasma_send_WaitReply(
|
||||
wait_req->client_conn->fd, manager_state->builder,
|
||||
wait_req->object_requests, wait_req->num_object_requests),
|
||||
wait_req->client_conn->fd);
|
||||
/* Remove the wait request from each of the relevant object_wait_requests hash
|
||||
* tables if it is present there. */
|
||||
for (int i = 0; i < wait_req->num_object_requests; ++i) {
|
||||
@@ -580,16 +581,20 @@ void send_queued_request(event_loop *loop,
|
||||
plasma_request_buffer *buf = conn->transfer_queue;
|
||||
switch (buf->type) {
|
||||
case MessageType_PlasmaDataRequest:
|
||||
CHECK(plasma_send_DataRequest(conn->fd, state->builder, buf->object_id,
|
||||
state->addr, state->port) >= 0);
|
||||
warn_if_sigpipe(
|
||||
plasma_send_DataRequest(conn->fd, state->builder, buf->object_id,
|
||||
state->addr, state->port),
|
||||
conn->fd);
|
||||
break;
|
||||
case MessageType_PlasmaDataReply:
|
||||
LOG_DEBUG("Transferring object to manager");
|
||||
if (conn->cursor == 0) {
|
||||
/* If the cursor is zero, we haven't sent any requests for this object
|
||||
* yet, so send the initial data request. */
|
||||
CHECK(plasma_send_DataReply(conn->fd, state->builder, buf->object_id,
|
||||
buf->data_size, buf->metadata_size) >= 0);
|
||||
warn_if_sigpipe(
|
||||
plasma_send_DataReply(conn->fd, state->builder, buf->object_id,
|
||||
buf->data_size, buf->metadata_size),
|
||||
conn->fd);
|
||||
}
|
||||
write_object_chunk(conn, buf);
|
||||
break;
|
||||
@@ -1169,9 +1174,10 @@ void request_status_done(object_id object_id,
|
||||
client_connection *client_conn = (client_connection *) context;
|
||||
int status =
|
||||
request_status(object_id, manager_count, manager_vector, context);
|
||||
CHECK(plasma_send_StatusReply(client_conn->fd,
|
||||
client_conn->manager_state->builder, &object_id,
|
||||
&status, 1) >= 0);
|
||||
warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd,
|
||||
client_conn->manager_state->builder,
|
||||
&object_id, &status, 1),
|
||||
client_conn->fd);
|
||||
}
|
||||
|
||||
int request_status(object_id object_id,
|
||||
@@ -1204,17 +1210,19 @@ void process_status_request(client_connection *client_conn,
|
||||
/* Return success immediately if we already have this object. */
|
||||
if (is_object_local(client_conn->manager_state, object_id)) {
|
||||
int status = ObjectStatus_Local;
|
||||
CHECK(plasma_send_StatusReply(client_conn->fd,
|
||||
client_conn->manager_state->builder,
|
||||
&object_id, &status, 1) >= 0);
|
||||
warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd,
|
||||
client_conn->manager_state->builder,
|
||||
&object_id, &status, 1),
|
||||
client_conn->fd);
|
||||
return;
|
||||
}
|
||||
|
||||
if (client_conn->manager_state->db == NULL) {
|
||||
int status = ObjectStatus_Nonexistent;
|
||||
CHECK(plasma_send_StatusReply(client_conn->fd,
|
||||
client_conn->manager_state->builder,
|
||||
&object_id, &status, 1) >= 0);
|
||||
warn_if_sigpipe(plasma_send_StatusReply(client_conn->fd,
|
||||
client_conn->manager_state->builder,
|
||||
&object_id, &status, 1),
|
||||
client_conn->fd);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1427,6 +1435,9 @@ void start_server(const char *store_socket_name,
|
||||
int port,
|
||||
const char *db_addr,
|
||||
int db_port) {
|
||||
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
|
||||
* to a client that has already died, the manager could die. */
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
/* Bind the sockets before we try to connect to the plasma store.
|
||||
* In case the bind does not succeed, we want to be able to exit
|
||||
* without breaking the pipe to the store. */
|
||||
|
||||
+82
-28
@@ -353,9 +353,34 @@ void seal_object(client *client_context,
|
||||
/* Send notifications to the clients that were waiting for this object. */
|
||||
for (int i = 0; i < utarray_len(notify_entry->waiting_clients); ++i) {
|
||||
client **c = (client **) utarray_eltptr(notify_entry->waiting_clients, i);
|
||||
CHECK(plasma_send_GetReply((*c)->sock, plasma_state->builder, &object_id,
|
||||
&object, 1) >= 0);
|
||||
CHECK(send_fd((*c)->sock, object.handle.store_fd) >= 0);
|
||||
int status;
|
||||
/* Send the get reply to the client. Handle errors on the write. If the
|
||||
* client has hung up, that's ok. */
|
||||
if (plasma_send_GetReply((*c)->sock, plasma_state->builder, &object_id,
|
||||
&object, 1) < 0) {
|
||||
if (errno == EPIPE || errno == EBADF) {
|
||||
LOG_WARN(
|
||||
"Failed to send a message to client on fd %d. The client may "
|
||||
"have hung up.",
|
||||
(*c)->sock);
|
||||
continue;
|
||||
} else {
|
||||
LOG_FATAL("Failed to send a message to client on fd %d.", (*c)->sock);
|
||||
}
|
||||
}
|
||||
/* Send the object's file descriptor to the client. Handle errors on the
|
||||
* write. If the client has hung up, that's ok. */
|
||||
if (send_fd((*c)->sock, object.handle.store_fd) < 0) {
|
||||
if (errno == EPIPE || errno == EBADF) {
|
||||
LOG_WARN(
|
||||
"Failed to send a message to client on fd %d. The client may "
|
||||
"have hung up.",
|
||||
(*c)->sock);
|
||||
continue;
|
||||
} else {
|
||||
LOG_FATAL("Failed to send a message to client on fd %d.", (*c)->sock);
|
||||
}
|
||||
}
|
||||
/* Record that the client is using this object. */
|
||||
add_client_to_object_clients(entry, *c);
|
||||
}
|
||||
@@ -458,7 +483,7 @@ void send_notifications(event_loop *loop,
|
||||
send_notifications, plasma_state);
|
||||
break;
|
||||
} else {
|
||||
CHECKM(0, "This code should be unreachable.");
|
||||
LOG_WARN("Failed to send notification to client on fd %d", client_sock);
|
||||
}
|
||||
num_processed += 1;
|
||||
}
|
||||
@@ -474,8 +499,14 @@ void send_notifications(event_loop *loop,
|
||||
void subscribe_to_updates(client *client_context, int conn) {
|
||||
LOG_DEBUG("subscribing to updates");
|
||||
plasma_store_state *plasma_state = client_context->plasma_state;
|
||||
/* TODO(rkn): The store could block here if the client doesn't send a file
|
||||
* descriptor. */
|
||||
int fd = recv_fd(conn);
|
||||
CHECK(fd >= 0);
|
||||
if (fd < 0) {
|
||||
/* This may mean that the client died before sending the file descriptor. */
|
||||
LOG_WARN("Failed to receive file descriptor from client on fd %d.", conn);
|
||||
return;
|
||||
}
|
||||
CHECKM(HASH_CNT(handle, plasma_state->plasma_store_info->objects) == 0,
|
||||
"plasma_subscribe should be called before any objects are created.");
|
||||
/* Create a new array to buffer notifications that can't be sent to the
|
||||
@@ -515,19 +546,24 @@ void process_message(event_loop *loop,
|
||||
&metadata_size);
|
||||
int error_code = create_object(client_context, object_ids[0], data_size,
|
||||
metadata_size, &objects[0]);
|
||||
CHECK(plasma_send_CreateReply(client_sock, state->builder, object_ids[0],
|
||||
&objects[0], error_code) >= 0);
|
||||
warn_if_sigpipe(
|
||||
plasma_send_CreateReply(client_sock, state->builder, object_ids[0],
|
||||
&objects[0], error_code),
|
||||
client_sock);
|
||||
if (error_code == PlasmaError_OK) {
|
||||
CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0);
|
||||
warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd),
|
||||
client_sock);
|
||||
}
|
||||
} break;
|
||||
case MessageType_PlasmaGetRequest: {
|
||||
plasma_read_GetRequest(input, object_ids, 1);
|
||||
if (get_object(client_context, client_sock, object_ids[0], &objects[0]) ==
|
||||
OBJECT_FOUND) {
|
||||
CHECK(plasma_send_GetReply(client_sock, state->builder, object_ids,
|
||||
objects, 1) >= 0);
|
||||
CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0);
|
||||
warn_if_sigpipe(plasma_send_GetReply(client_sock, state->builder,
|
||||
object_ids, objects, 1),
|
||||
client_sock);
|
||||
warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd),
|
||||
client_sock);
|
||||
}
|
||||
} break;
|
||||
case MessageType_PlasmaGetLocalRequest: {
|
||||
@@ -535,13 +571,19 @@ void process_message(event_loop *loop,
|
||||
if (get_object_local(client_context, client_sock, object_ids[0],
|
||||
&objects[0]) == OBJECT_FOUND) {
|
||||
int has_object = 1;
|
||||
CHECK(plasma_send_GetLocalReply(client_sock, state->builder, object_ids,
|
||||
objects, &has_object, 1) >= 0);
|
||||
CHECK(send_fd(client_sock, objects[0].handle.store_fd) >= 0);
|
||||
warn_if_sigpipe(
|
||||
plasma_send_GetLocalReply(client_sock, state->builder, object_ids,
|
||||
objects, &has_object, 1),
|
||||
client_sock);
|
||||
warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd),
|
||||
client_sock);
|
||||
} else {
|
||||
int has_object = 0;
|
||||
CHECK(plasma_send_GetLocalReply(client_sock, state->builder, object_ids,
|
||||
objects, &has_object, 1) >= 0);
|
||||
|
||||
warn_if_sigpipe(
|
||||
plasma_send_GetLocalReply(client_sock, state->builder, object_ids,
|
||||
objects, &has_object, 1),
|
||||
client_sock);
|
||||
}
|
||||
} break;
|
||||
case MessageType_PlasmaReleaseRequest:
|
||||
@@ -551,11 +593,13 @@ void process_message(event_loop *loop,
|
||||
case MessageType_PlasmaContainsRequest:
|
||||
plasma_read_ContainsRequest(input, &object_ids[0]);
|
||||
if (contains_object(client_context, object_ids[0]) == OBJECT_FOUND) {
|
||||
CHECK(plasma_send_ContainsReply(client_sock, state->builder,
|
||||
object_ids[0], 1) >= 0);
|
||||
warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder,
|
||||
object_ids[0], 1),
|
||||
client_sock);
|
||||
} else {
|
||||
CHECK(plasma_send_ContainsReply(client_sock, state->builder,
|
||||
object_ids[0], 0) >= 0);
|
||||
warn_if_sigpipe(plasma_send_ContainsReply(client_sock, state->builder,
|
||||
object_ids[0], 0),
|
||||
client_sock);
|
||||
}
|
||||
break;
|
||||
case MessageType_PlasmaSealRequest: {
|
||||
@@ -575,19 +619,21 @@ void process_message(event_loop *loop,
|
||||
&num_objects_to_evict, &objects_to_evict);
|
||||
remove_objects(client_context->plasma_state, num_objects_to_evict,
|
||||
objects_to_evict);
|
||||
CHECK(plasma_send_EvictReply(client_sock, state->builder,
|
||||
num_bytes_evicted) >= 0);
|
||||
warn_if_sigpipe(
|
||||
plasma_send_EvictReply(client_sock, state->builder, num_bytes_evicted),
|
||||
client_sock);
|
||||
} break;
|
||||
case MessageType_PlasmaSubscribeRequest:
|
||||
subscribe_to_updates(client_context, client_sock);
|
||||
break;
|
||||
case MessageType_PlasmaConnectRequest:
|
||||
CHECK(plasma_send_ConnectReply(client_sock, state->builder,
|
||||
state->plasma_store_info->memory_capacity) >=
|
||||
0);
|
||||
break;
|
||||
case MessageType_PlasmaConnectRequest: {
|
||||
warn_if_sigpipe(
|
||||
plasma_send_ConnectReply(client_sock, state->builder,
|
||||
state->plasma_store_info->memory_capacity),
|
||||
client_sock);
|
||||
} break;
|
||||
case DISCONNECT_CLIENT: {
|
||||
LOG_DEBUG("Disconnecting client on fd %d", client_sock);
|
||||
LOG_INFO("Disconnecting client on fd %d", client_sock);
|
||||
event_loop_remove_file(loop, client_sock);
|
||||
/* If this client was using any objects, remove it from the appropriate
|
||||
* lists. */
|
||||
@@ -597,6 +643,10 @@ void process_message(event_loop *loop,
|
||||
temp_entry) {
|
||||
remove_client_from_object_clients(entry, client_context);
|
||||
}
|
||||
/* Note, the store may still attempt to send a message to the disconnected
|
||||
* client (for example, when an object ID that the client was waiting for
|
||||
* is ready). In these cases, the attempt to send the message will fail, but
|
||||
* the store should just ignore the failure. */
|
||||
} break;
|
||||
default:
|
||||
/* This code should be unreachable. */
|
||||
@@ -629,6 +679,10 @@ void signal_handler(int signal) {
|
||||
}
|
||||
|
||||
void start_server(char *socket_name, int64_t system_memory) {
|
||||
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
|
||||
* to a client that has already died, the store could die. */
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
/* Create the event loop. */
|
||||
event_loop *loop = event_loop_create();
|
||||
plasma_store_state *state = init_plasma_store(loop, system_memory);
|
||||
int socket = bind_ipc_sock(socket_name, true);
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import ray
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
class ComponentFailureTest(unittest.TestCase):
|
||||
# This test checks that when a worker dies in the middle of a get, the plasma
|
||||
# store and manager will not die.
|
||||
def testDyingWorkerGet(self):
|
||||
obj_id = 20 * b"a"
|
||||
@ray.remote
|
||||
def f():
|
||||
ray.worker.global_worker.plasma_client.get(obj_id)
|
||||
|
||||
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
|
||||
|
||||
# Have the worker wait in a get call.
|
||||
f.remote()
|
||||
|
||||
# Kill the worker.
|
||||
time.sleep(1)
|
||||
ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER][0].terminate()
|
||||
time.sleep(0.1)
|
||||
|
||||
# Seal the object so the store attempts to notify the worker that the get
|
||||
# has been fulfilled.
|
||||
ray.worker.global_worker.plasma_client.create(obj_id, 100)
|
||||
ray.worker.global_worker.plasma_client.seal(obj_id)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Make sure that nothing has died.
|
||||
self.assertTrue(ray.services.all_processes_alive(exclude=[ray.services.PROCESS_TYPE_WORKER]))
|
||||
ray.worker.cleanup()
|
||||
|
||||
# This test checks that when a worker dies in the middle of a wait, the plasma
|
||||
# store and manager will not die.
|
||||
def testDyingWorkerWait(self):
|
||||
obj_id = 20 * b"a"
|
||||
@ray.remote
|
||||
def f():
|
||||
ray.worker.global_worker.plasma_client.wait([obj_id])
|
||||
|
||||
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
|
||||
|
||||
# Have the worker wait in a get call.
|
||||
f.remote()
|
||||
|
||||
# Kill the worker.
|
||||
time.sleep(1)
|
||||
ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER][0].terminate()
|
||||
time.sleep(0.1)
|
||||
|
||||
# Seal the object so the store attempts to notify the worker that the get
|
||||
# has been fulfilled.
|
||||
ray.worker.global_worker.plasma_client.create(obj_id, 100)
|
||||
ray.worker.global_worker.plasma_client.seal(obj_id)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Make sure that nothing has died.
|
||||
self.assertTrue(ray.services.all_processes_alive(exclude=[ray.services.PROCESS_TYPE_WORKER]))
|
||||
ray.worker.cleanup()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
Reference in New Issue
Block a user