From 2b8e6485e3454313e6e1355a55e7eb07eb8a01b5 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Fri, 10 Feb 2017 12:46:23 -0800 Subject: [PATCH] Start and clean up workers from the local scheduler. (#250) * Start and clean up workers from the local scheduler Ability to kill workers in photon scheduler Test for old method of starting workers Common codepath for killing workers Common codepath for killing workers Photon test case for starting and killing workers fix build Fix component failure test Register a worker's pid as part of initial connection Address comments and revert photon_connect Set PATH during travis install Fix * Fix photon test case to accept clients on plasma manager fd --- .travis.yml | 2 + python/photon/photon_services.py | 25 ++- python/ray/services.py | 98 ++++++--- python/ray/worker.py | 19 +- src/photon/photon.h | 24 ++- src/photon/photon_client.c | 5 + src/photon/photon_extension.c | 1 + src/photon/photon_scheduler.c | 334 ++++++++++++++++++++++++------- src/photon/photon_scheduler.h | 7 +- src/photon/test/photon_tests.c | 180 +++++++++++++---- test/component_failures_test.py | 10 +- test/runtest.py | 16 ++ 12 files changed, 556 insertions(+), 165 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6c12b07e5..1aa93d968 100644 --- a/.travis.yml +++ b/.travis.yml @@ -53,6 +53,8 @@ install: - ./.travis/install-dependencies.sh - ./.travis/install-ray.sh + - if [[ "$PYTHON" == "3.5" ]]; then export PATH="$HOME/miniconda/bin:$PATH"; fi + - cd python/core - bash ../../src/common/test/run_tests.sh - bash ../../src/plasma/test/run_tests.sh diff --git a/python/photon/photon_services.py b/python/photon/photon_services.py index cf98006a1..eaab77df0 100644 --- a/python/photon/photon_services.py +++ b/python/photon/photon_services.py @@ -11,11 +11,17 @@ import time def random_name(): return str(random.randint(0, 99999999)) -def start_local_scheduler(plasma_store_name, plasma_manager_name=None, - worker_path=None, plasma_address=None, - node_ip_address="127.0.0.1", redis_address=None, - use_valgrind=False, use_profiler=False, - redirect_output=False, static_resource_list=None): +def start_local_scheduler(plasma_store_name, + plasma_manager_name=None, + worker_path=None, + plasma_address=None, + node_ip_address="127.0.0.1", + redis_address=None, + use_valgrind=False, + use_profiler=False, + redirect_output=False, + static_resource_list=None, + num_workers=0): """Start a local scheduler process. Args: @@ -41,6 +47,8 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, static_resource_list (list): A list of integers specifying the local scheduler's resource capacities. The resources should appear in an order matching the order defined in task.h. + num_workers (int): The number of workers that the local scheduler should + start. Return: A tuple of the name of the local scheduler socket and the process ID of the @@ -52,7 +60,12 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, raise Exception("Cannot use valgrind and profiler at the same time.") local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/photon/photon_scheduler") local_scheduler_name = "/tmp/scheduler{}".format(random_name()) - command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name, "-h", node_ip_address] + command = [local_scheduler_executable, + "-s", local_scheduler_name, + "-p", plasma_store_name, + "-h", node_ip_address, + "-n", str(num_workers), + ] if plasma_manager_name is not None: command += ["-m", plasma_manager_name] if worker_path is not None: diff --git a/python/ray/services.py b/python/ray/services.py index b5f7c9463..55aedf9aa 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -13,6 +13,7 @@ import subprocess import sys import time from collections import namedtuple, OrderedDict +import threading # Ray modules import photon @@ -89,18 +90,24 @@ def kill_process(p): if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data. time.sleep(0.1) # Wait for profiling data to be written. - p.kill() - # Sleeping for 0 should yield the core and allow the killed process to process - # its pending signals. - time.sleep(0) + + # Allow the process one second to exit gracefully. + p.terminate() + timer = threading.Timer(1, lambda p: p.kill(), [p]) + try: + timer.start() + p.wait() + finally: + timer.cancel() + if p.poll() is not None: return True - p.terminate() - # Sleeping for 0 should yield the core and allow the killed process to process - # its pending signals. - time.sleep(0) - if p.poll is not None: + + # If the process did not exit within one second, force kill it. + p.kill() + if p.poll() is not None: return True + # The process was not killed for some reason. return False @@ -262,10 +269,16 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): if cleanup: all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p) -def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, - plasma_manager_name, worker_path, plasma_address=None, - cleanup=True, redirect_output=False, - static_resource_list=None): +def start_local_scheduler(redis_address, + node_ip_address, + plasma_store_name, + plasma_manager_name, + worker_path, + plasma_address=None, + cleanup=True, + redirect_output=False, + static_resource_list=None, + num_workers=0): """Start a local scheduler process. Args: @@ -284,6 +297,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, /dev/null. static_resource_list (list): An ordered list of the configured resource capacities for this local scheduler. + num_workers (int): The number of workers that the local scheduler should + start. Return: The name of the local scheduler socket. @@ -296,7 +311,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, redirect_output=redirect_output, - static_resource_list=static_resource_list) + static_resource_list=static_resource_list, + num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) return local_scheduler_name @@ -391,6 +407,7 @@ def start_ray_processes(address_info=None, redirect_output=False, include_global_scheduler=False, include_redis=False, + start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None): """Helper method to start Ray processes. @@ -417,6 +434,9 @@ def start_ray_processes(address_info=None, start a global scheduler process. include_redis (bool): If include_redis is True, then start a Redis server process. + start_workers_from_local_scheduler (bool): If this flag is True, then start + the initial workers from the local scheduler. Else, start them from + Python. num_cpus: A list of length num_local_schedulers containing the number of CPUs each local scheduler should be configured with. num_gpus: A list of length num_local_schedulers containing the number of @@ -489,12 +509,25 @@ def start_ray_processes(address_info=None, object_store_addresses.append(object_store_address) time.sleep(0.1) + # Determine how many workers to start for each local scheduler. + num_workers_per_local_scheduler = [0] * num_local_schedulers + for i in range(num_workers): + num_workers_per_local_scheduler[i % num_local_schedulers] += 1 + # Start any local schedulers that do not yet exist. for i in range(len(local_scheduler_socket_names), num_local_schedulers): # Connect the local scheduler to the object store at the same index. object_store_address = object_store_addresses[i] plasma_address = "{}:{}".format(node_ip_address, object_store_address.manager_port) + # Determine how many workers this local scheduler should start. + if start_workers_from_local_scheduler: + num_local_scheduler_workers = num_workers_per_local_scheduler[i] + num_workers_per_local_scheduler[i] = 0 + else: + # If we're starting the workers from Python, the local scheduler should + # not start any workers. + num_local_scheduler_workers = 0 # Start the local scheduler. local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, @@ -504,7 +537,8 @@ def start_ray_processes(address_info=None, plasma_address=plasma_address, cleanup=cleanup, redirect_output=redirect_output, - static_resource_list=[num_cpus[i], num_gpus[i]]) + static_resource_list=[num_cpus[i], num_gpus[i]], + num_workers=num_local_scheduler_workers) local_scheduler_socket_names.append(local_scheduler_name) time.sleep(0.1) @@ -513,18 +547,23 @@ def start_ray_processes(address_info=None, assert len(object_store_addresses) == num_local_schedulers assert len(local_scheduler_socket_names) == num_local_schedulers - # Start the workers. - for i in range(num_workers): - object_store_address = object_store_addresses[i % num_local_schedulers] - local_scheduler_name = local_scheduler_socket_names[i % num_local_schedulers] - start_worker(node_ip_address, - object_store_address.name, - object_store_address.manager_name, - local_scheduler_name, - redis_address, - worker_path, - cleanup=cleanup, - redirect_output=redirect_output) + # Start any workers that the local scheduler has not already started. + for i, num_local_scheduler_workers in enumerate(num_workers_per_local_scheduler): + object_store_address = object_store_addresses[i] + local_scheduler_name = local_scheduler_socket_names[i] + for j in range(num_local_scheduler_workers): + start_worker(node_ip_address, + object_store_address.name, + object_store_address.manager_name, + local_scheduler_name, + redis_address, + worker_path, + cleanup=cleanup, + redirect_output=redirect_output) + num_workers_per_local_scheduler[i] -= 1 + + # Make sure that we've started all the workers. + assert(sum(num_workers_per_local_scheduler) == 0) # Return the addresses of the relevant processes. return address_info @@ -581,6 +620,7 @@ def start_ray_head(address_info=None, worker_path=None, cleanup=True, redirect_output=False, + start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None): """Start Ray in local mode. @@ -603,6 +643,9 @@ def start_ray_head(address_info=None, method exits. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. + start_workers_from_local_scheduler (bool): If this flag is True, then start + the initial workers from the local scheduler. Else, start them from + Python. num_cpus (int): number of cpus to configure the local scheduler with. num_gpus (int): number of gpus to configure the local scheduler with. @@ -619,5 +662,6 @@ def start_ray_head(address_info=None, redirect_output=redirect_output, include_global_scheduler=True, include_redis=True, + start_workers_from_local_scheduler=start_workers_from_local_scheduler, num_cpus=num_cpus, num_gpus=num_gpus) diff --git a/python/ray/worker.py b/python/ray/worker.py index 8930e98ce..1b98bdd3b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -735,9 +735,15 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5): time.sleep(1) counter += 1 -def _init(address_info=None, start_ray_local=False, object_id_seed=None, - num_workers=None, num_local_schedulers=None, - driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None): +def _init(address_info=None, + start_ray_local=False, + object_id_seed=None, + num_workers=None, + num_local_schedulers=None, + driver_mode=SCRIPT_MODE, + start_workers_from_local_scheduler=True, + num_cpus=None, + num_gpus=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -764,6 +770,9 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, only provided if start_ray_local is True. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + start_workers_from_local_scheduler (bool): If this flag is True, then start + the initial workers from the local scheduler. Else, start them from + Python. The latter case is for debugging purposes only. num_cpus: A list containing the number of CPUs the local schedulers should be configured with. num_gpus: A list containing the number of GPUs the local schedulers should @@ -815,7 +824,9 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, - num_cpus=num_cpus, num_gpus=num_gpus) + start_workers_from_local_scheduler=start_workers_from_local_scheduler, + num_cpus=num_cpus, + num_gpus=num_gpus) else: if redis_address is None: raise Exception("If start_ray_local=False, then redis_address must be provided.") diff --git a/src/photon/photon.h b/src/photon/photon.h index 691daad64..4df923ac4 100644 --- a/src/photon/photon.h +++ b/src/photon/photon.h @@ -27,20 +27,14 @@ enum photon_message_type { RECONSTRUCT_OBJECT, /** Log a message to the event table. */ EVENT_LOG_MESSAGE, + /** Register a worker's process ID with the local scheduler. */ + REGISTER_PID, }; -// clang-format off -/** Contains all information that is associated to a worker. */ -typedef struct { - int sock; - /** A pointer to a task object, to update the task table. */ - task *task_in_progress; -} worker; -// clang-format on - /* These are needed to define the UT_arrays. */ UT_icd task_ptr_icd; -UT_icd worker_icd; +UT_icd workers_icd; +UT_icd pid_t_icd; /** Internal state of the scheduling algorithm. */ typedef struct scheduling_algorithm_state scheduling_algorithm_state; @@ -50,7 +44,7 @@ typedef struct scheduling_algorithm_state scheduling_algorithm_state; * scheduler. */ typedef struct { /** The script to use when starting a new worker. */ - char *start_worker_command; + const char **start_worker_command; /** Whether there is a global scheduler. */ bool global_scheduler_exists; } local_scheduler_config; @@ -65,6 +59,9 @@ typedef struct { * structs when we free the scheduler state and also to access the worker * structs in the tests. */ UT_array *workers; + /** List of the process IDs for child processes (workers) started by the + * local scheduler that have not sent a REGISTER_PID message yet. */ + UT_array *child_pids; /** The handle to the database. */ db_handle *db; /** The Plasma client. */ @@ -90,6 +87,11 @@ typedef struct { * no task is running on the worker, this will be NULL. This is used to * update the task table. */ task *task_in_progress; + /** The process ID of the client. If this is set to zero, the client has not + * yet registered a process ID. */ + pid_t pid; + /** Whether the client is a child process of the local scheduler. */ + bool is_child; /** A pointer to the local scheduler state. */ local_scheduler_state *local_scheduler_state; } local_scheduler_client; diff --git a/src/photon/photon_client.c b/src/photon/photon_client.c index 5452a299a..4b41e03a5 100644 --- a/src/photon/photon_client.c +++ b/src/photon/photon_client.c @@ -7,6 +7,11 @@ photon_conn *photon_connect(const char *photon_socket) { photon_conn *result = malloc(sizeof(photon_conn)); result->conn = connect_ipc_sock(photon_socket); + /* If this is a worker, register the process ID with the local scheduler. */ + pid_t my_pid = getpid(); + int success = write_message(result->conn, REGISTER_PID, sizeof(my_pid), + (uint8_t *) &my_pid); + CHECKM(success == 0, "Unable to register worker with local scheduler"); return result; } diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 428a8cf04..851fc65c5 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -20,6 +20,7 @@ static int PyPhotonClient_init(PyPhotonClient *self, if (!PyArg_ParseTuple(args, "s", &socket_name)) { return -1; } + /* Connect to the Photon scheduler. */ self->photon_connection = photon_connect(socket_name); return 0; } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 676fd9b62..a572dcc00 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "common.h" @@ -23,6 +24,8 @@ UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL}; UT_icd workers_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL}; +UT_icd pid_t_icd = {sizeof(pid_t), NULL, NULL, NULL}; + UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; /** @@ -53,6 +56,184 @@ void print_resource_info(const local_scheduler_state *state, #endif } +/** + * Kill a worker, if it is a child process, and clean up all of its associated + * state. + * + * @param worker A pointer to the worker we want to kill. + * @param wait A bool representing whether we should wait for the worker's + * process to exit. If the worker is not a child process, this flag is + * ignored. + * @return Void. + */ +void kill_worker(local_scheduler_client *worker, bool wait) { + /* Erase the worker from the array of workers. */ + local_scheduler_state *state = worker->local_scheduler_state; + int num_workers = utarray_len(state->workers); + for (int i = 0; i < utarray_len(state->workers); ++i) { + local_scheduler_client *active_worker = + *(local_scheduler_client **) utarray_eltptr(state->workers, i); + if (active_worker == worker) { + utarray_erase(state->workers, i, 1); + } + } + /* Make sure that we erased exactly 1 worker. */ + CHECKM(!(utarray_len(state->workers) < num_workers - 1), + "Found duplicate workers"); + CHECKM(utarray_len(state->workers) != num_workers, + "Tried to kill worker that doesn't exist"); + + /* Remove the client socket from the event loop so that we don't process the + * SIGPIPE when the worker is killed. */ + event_loop_remove_file(worker->local_scheduler_state->loop, worker->sock); + + /* If the worker has registered a process ID with us and it's a child + * process, use it to send a kill signal. */ + if (worker->is_child && worker->pid != 0) { + kill(worker->pid, SIGKILL); + if (wait) { + /* Wait for the process to exit. */ + waitpid(worker->pid, NULL, 0); + } + } + + /* Clean up the client socket after killing the worker so that the worker + * can't receive the SIGPIPE before exiting. */ + close(worker->sock); + + /* Clean up the task in progress. */ + if (worker->task_in_progress) { + /* TODO(swang): Update the task table to mark the task as lost. */ + free_task(worker->task_in_progress); + } + + LOG_DEBUG("Killed worker with pid %d", worker->pid); + free(worker); +} + +void free_local_scheduler(local_scheduler_state *state) { + /* Free the command for starting new workers. */ + if (state->config.start_worker_command != NULL) { + int i = 0; + const char *arg = state->config.start_worker_command[i]; + while (arg != NULL) { + free((void *) arg); + ++i; + arg = state->config.start_worker_command[i]; + } + free(state->config.start_worker_command); + state->config.start_worker_command = NULL; + } + + /* Disconnect from the database. */ + if (state->db != NULL) { + db_disconnect(state->db); + state->db = NULL; + } + /* Disconnect from plasma. */ + plasma_disconnect(state->plasma_conn); + state->plasma_conn = NULL; + + /* Kill any child processes that didn't register as a worker yet. */ + pid_t *worker_pid; + for (worker_pid = (pid_t *) utarray_front(state->child_pids); + worker_pid != NULL; + worker_pid = (pid_t *) utarray_next(state->child_pids, worker_pid)) { + kill(*worker_pid, SIGKILL); + waitpid(*worker_pid, NULL, 0); + LOG_DEBUG("Killed pid %d", *worker_pid); + } + utarray_free(state->child_pids); + + /* Free the list of workers and any tasks that are still in progress on those + * workers. */ + for (local_scheduler_client **worker = + (local_scheduler_client **) utarray_front(state->workers); + worker != NULL; + worker = (local_scheduler_client **) utarray_front(state->workers)) { + kill_worker(*worker, true); + } + utarray_free(state->workers); + state->workers = NULL; + + /* Free the algorithm state. */ + free_scheduling_algorithm_state(state->algorithm_state); + state->algorithm_state = NULL; + /* Free the input buffer. */ + utarray_free(state->input_buffer); + state->input_buffer = NULL; + /* Destroy the event loop. */ + event_loop_destroy(state->loop); + state->loop = NULL; + /* Free the scheduler state. */ + free(state); +} + +/** + * Start a new worker as a child process. + * + * @param state The state of the local scheduler. + * @return Void. + */ +void start_worker(local_scheduler_state *state) { + /* We can't start a worker if we don't have the path to the worker script. */ + CHECK(state->config.start_worker_command != NULL); + /* Launch the process to create the worker. */ + pid_t pid = fork(); + if (pid != 0) { + utarray_push_back(state->child_pids, &pid); + LOG_DEBUG("Started worker with pid %d", pid); + return; + } + + /* Try to execute the worker command. Exit if we're not successful. */ + execvp(state->config.start_worker_command[0], + (char *const *) state->config.start_worker_command); + free_local_scheduler(state); + LOG_FATAL("Failed to start worker"); +} + +/** + * Parse the command to start a worker. This takes in the command string, + * splits it into tokens on the space characters, and allocates an array of the + * tokens, terminated by a NULL pointer. + * + * @param command The command string to start a worker. + * @return A pointer to an array of strings, the tokens in the command string. + * The last element is a NULL pointer. + */ +const char **parse_command(const char *command) { + /* Count the number of tokens. */ + char *command_copy = strdup(command); + const char *delimiter = " "; + char *token = NULL; + int num_args = 0; + token = strtok(command_copy, delimiter); + while (token != NULL) { + ++num_args; + token = strtok(NULL, delimiter); + } + free(command_copy); + + /* Allocate a NULL-terminated array for the tokens. */ + const char **command_args = malloc((num_args + 1) * sizeof(const char *)); + command_args[num_args] = NULL; + + /* Fill in the token array. */ + command_copy = strdup(command); + token = strtok(command_copy, delimiter); + int i = 0; + while (token != NULL) { + command_args[i] = strdup(token); + ++i; + token = strtok(NULL, delimiter); + } + free(command_copy); + + CHECK(num_args == i); + return command_args; +} + local_scheduler_state *init_local_scheduler( const char *node_ip_address, event_loop *loop, @@ -63,12 +244,13 @@ local_scheduler_state *init_local_scheduler( const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, + const double static_resource_conf[], const char *start_worker_command, - const double static_resource_conf[]) { + int num_workers) { local_scheduler_state *state = malloc(sizeof(local_scheduler_state)); /* Set the configuration struct for the local scheduler. */ if (start_worker_command != NULL) { - state->config.start_worker_command = strdup(start_worker_command); + state->config.start_worker_command = parse_command(start_worker_command); } else { state->config.start_worker_command = NULL; } @@ -123,53 +305,16 @@ local_scheduler_state *init_local_scheduler( } /* Print some debug information about resource configuration. */ print_resource_info(state, NULL); + + /* Start the initial set of workers. */ + utarray_new(state->child_pids, &pid_t_icd); + for (int i = 0; i < num_workers; ++i) { + start_worker(state); + } + return state; }; -void free_local_scheduler(local_scheduler_state *state) { - /* Free the command for starting new workers. */ - if (state->config.start_worker_command != NULL) { - free(state->config.start_worker_command); - state->config.start_worker_command = NULL; - } - - /* Disconnect from the database. */ - if (state->db != NULL) { - db_disconnect(state->db); - state->db = NULL; - } - /* Disconnect from plasma. */ - plasma_disconnect(state->plasma_conn); - state->plasma_conn = NULL; - - /* Free the list of workers and any tasks that are still in progress on those - * workers. */ - for (int i = 0; i < utarray_len(state->workers); ++i) { - local_scheduler_client **worker = - (local_scheduler_client **) utarray_eltptr(state->workers, i); - if ((*worker)->task_in_progress != NULL) { - free_task((*worker)->task_in_progress); - (*worker)->task_in_progress = NULL; - } - free(*worker); - *worker = NULL; - } - utarray_free(state->workers); - state->workers = NULL; - - /* Free the algorithm state. */ - free_scheduling_algorithm_state(state->algorithm_state); - state->algorithm_state = NULL; - /* Free the input buffer. */ - utarray_free(state->input_buffer); - state->input_buffer = NULL; - /* Destroy the event loop. */ - event_loop_destroy(state->loop); - state->loop = NULL; - /* Free the scheduler state. */ - free(state); -} - void assign_task_to_worker(local_scheduler_state *state, task_spec *spec, local_scheduler_client *worker) { @@ -384,10 +529,33 @@ void process_message(event_loop *loop, } break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); - event_loop_remove_file(loop, client_sock); + kill_worker(worker, false); } break; case LOG_MESSAGE: { } break; + case REGISTER_PID: { + pid_t *worker_pid = (pid_t *) utarray_front(state->input_buffer); + worker->pid = *worker_pid; + + /* Determine if this worker is one of our child processes. */ + LOG_DEBUG("Pid is %d", *worker_pid); + pid_t *child_pid; + int index = 0; + for (child_pid = (pid_t *) utarray_front(state->child_pids); + child_pid != NULL; + child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) { + if (*child_pid == *worker_pid) { + /* If this worker is one of our child processes, mark it as a child so + * that we know that we can wait for the process to exit during + * cleanup. */ + worker->is_child = true; + utarray_erase(state->child_pids, index, 1); + LOG_DEBUG("Found matching child pid %d", *worker_pid); + break; + } + ++index; + } + } break; default: /* This code should be unreachable. */ CHECK(0); @@ -405,6 +573,8 @@ void new_client_connection(event_loop *loop, local_scheduler_client *worker = malloc(sizeof(local_scheduler_client)); worker->sock = new_socket; worker->task_in_progress = NULL; + worker->pid = 0; + worker->is_child = false; worker->local_scheduler_state = state; utarray_push_back(state->workers, &worker); event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, @@ -417,6 +587,7 @@ void new_client_connection(event_loop *loop, local_scheduler_state *g_state; void signal_handler(int signal) { + LOG_DEBUG("Signal was %d", signal); if (signal == SIGTERM) { free_local_scheduler(g_state); exit(0); @@ -442,14 +613,6 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { return LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS; } -void start_new_worker(local_scheduler_state *state) { - /* We can't start a worker if we don't have the path to the worker script. */ - CHECK(state->config.start_worker_command != NULL); - /* Launch the process to create the worker. */ - FILE *p = popen(state->config.start_worker_command, "r"); - UNUSED(p); -} - void start_server(const char *node_ip_address, const char *socket_name, const char *redis_addr, @@ -458,8 +621,9 @@ void start_server(const char *node_ip_address, const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, + const double static_resource_conf[], const char *start_worker_command, - const double static_resource_conf[]) { + int num_workers) { /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write * to a client that has already died, the local scheduler could die. */ signal(SIGPIPE, SIG_IGN); @@ -468,8 +632,8 @@ void start_server(const char *node_ip_address, g_state = init_local_scheduler( node_ip_address, loop, redis_addr, redis_port, socket_name, plasma_store_socket_name, plasma_manager_socket_name, - plasma_manager_address, global_scheduler_exists, start_worker_command, - static_resource_conf); + plasma_manager_address, global_scheduler_exists, static_resource_conf, + start_worker_command, num_workers); /* Register a callback for registering new clients. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, g_state); @@ -518,14 +682,16 @@ int main(int argc, char *argv[]) { char *plasma_manager_address = NULL; /* The IP address of the node that this local scheduler is running on. */ char *node_ip_address = NULL; - /* The command to run when starting new workers. */ - char *start_worker_command = NULL; /* Comma-separated list of configured resource capabilities for this node. */ char *static_resource_list = NULL; double static_resource_conf[MAX_RESOURCE_INDEX]; + /* The command to run when starting new workers. */ + char *start_worker_command = NULL; + /* The number of workers to start. */ + char *num_workers_str = NULL; int c; bool global_scheduler_exists = true; - while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:c:")) != -1) { + while ((c = getopt(argc, argv, "s:r:p:m:ga:h:c:w:n:")) != -1) { switch (c) { case 's': scheduler_socket_name = optarg; @@ -548,11 +714,14 @@ int main(int argc, char *argv[]) { case 'h': node_ip_address = optarg; break; + case 'c': + static_resource_list = optarg; + break; case 'w': start_worker_command = optarg; break; - case 'c': - static_resource_list = optarg; + case 'n': + num_workers_str = optarg; break; default: LOG_FATAL("unknown option %c", c); @@ -585,6 +754,16 @@ int main(int argc, char *argv[]) { if (!node_ip_address) { LOG_FATAL("please specify the node IP address with -h switch"); } + int num_workers = 0; + if (num_workers_str) { + num_workers = strtol(num_workers_str, NULL, 10); + if (num_workers < 0) { + LOG_FATAL("Number of workers must be nonnegative"); + } + } + + char *redis_addr = NULL; + int redis_port = -1; if (!redis_addr_port) { /* Start the local scheduler without connecting to Redis. In this case, all * submitted tasks will be queued and scheduled locally. */ @@ -593,31 +772,34 @@ int main(int argc, char *argv[]) { "if a plasma manager socket name is provided with the -m switch, " "then a redis address must be provided with the -r switch"); } - start_server(node_ip_address, scheduler_socket_name, NULL, -1, - plasma_store_socket_name, NULL, plasma_manager_address, - global_scheduler_exists, start_worker_command, - static_resource_conf); } else { + char redis_addr_buffer[16] = {0}; + char redis_port_str[6] = {0}; /* Parse the Redis address into an IP address and a port. */ - char redis_addr[16] = {0}; - char redis_port[6] = {0}; - int num_assigned = - sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port); + int num_assigned = sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", + redis_addr_buffer, redis_port_str); if (num_assigned != 2) { LOG_FATAL( "if a redis address is provided with the -r switch, it should be " "formatted like 127.0.0.1:6379"); } + redis_addr = redis_addr_buffer; + redis_port = strtol(redis_port_str, NULL, 10); + if (redis_port == 0) { + LOG_FATAL("Unable to parse port number from redis address %s", + redis_addr_port); + } if (!plasma_manager_socket_name) { LOG_FATAL( "please specify socket for connecting to Plasma manager with -m " "switch"); } - start_server(node_ip_address, scheduler_socket_name, &redis_addr[0], - atoi(redis_port), plasma_store_socket_name, - plasma_manager_socket_name, plasma_manager_address, - global_scheduler_exists, start_worker_command, - static_resource_conf); } + + LOG_INFO("Start worker command is %s", start_worker_command); + start_server(node_ip_address, scheduler_socket_name, redis_addr, redis_port, + plasma_store_socket_name, plasma_manager_socket_name, + plasma_manager_address, global_scheduler_exists, + static_resource_conf, start_worker_command, num_workers); } #endif diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 3fa3f9344..5e454804c 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -79,8 +79,9 @@ local_scheduler_state *init_local_scheduler( const char *plasma_store_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, + const double static_resource_vector[], const char *worker_path, - const double static_resource_vector[]); + int num_workers); void free_local_scheduler(local_scheduler_state *state); @@ -90,6 +91,10 @@ void process_message(event_loop *loop, int client_sock, void *context, int events); + +void kill_worker(local_scheduler_client *worker, bool wait); + +void start_worker(local_scheduler_state *state); #endif #endif /* PHOTON_SCHEDULER_H */ diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index 414e2c31b..1ff10e907 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -33,25 +33,33 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { } typedef struct { - /** A socket to mock the Plasma store. */ - int plasma_fd; + /** A socket to mock the Plasma manager. Clients (such as workers) that + * connect to this file descriptor must be accepted. */ + int plasma_manager_fd; + /** A socket to communicate with the Plasma store. */ + int plasma_store_fd; /** Photon's socket for IPC requests. */ int photon_fd; /** Photon's local scheduler state. */ local_scheduler_state *photon_state; /** Photon's event loop. */ event_loop *loop; - /** A Photon client connection. */ - photon_conn *conn; + /** Number of Photon client connections, or mock workers. */ + int num_photon_conns; + /** Photon client connections. */ + photon_conn **conns; } photon_mock; -photon_mock *init_photon_mock(bool connect_to_redis) { +photon_mock *init_photon_mock(bool connect_to_redis, + int num_workers, + int num_mock_workers) { + const char *node_ip_address = "127.0.0.1"; const char *redis_addr = NULL; int redis_port = -1; const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS, DEFAULT_NUM_GPUS}; if (connect_to_redis) { - redis_addr = "127.0.0.1"; + redis_addr = node_ip_address; redis_port = 6379; } @@ -59,34 +67,60 @@ photon_mock *init_photon_mock(bool connect_to_redis) { memset(mock, 0, sizeof(photon_mock)); mock->loop = event_loop_create(); /* Bind to the Photon port and initialize the Photon scheduler. */ - /* TODO(rkn): Why are we reusing mock->plasma_fd for both the store and the - * manager? */ - UT_string *plasma_manager_socket_name = - bind_ipc_sock_retry(plasma_manager_socket_name_format, &mock->plasma_fd); - mock->plasma_fd = socket_connect_retry(plasma_store_socket_name, 5, 100); + UT_string *plasma_manager_socket_name = bind_ipc_sock_retry( + plasma_manager_socket_name_format, &mock->plasma_manager_fd); + mock->plasma_store_fd = + socket_connect_retry(plasma_store_socket_name, 5, 100); UT_string *photon_socket_name = bind_ipc_sock_retry(photon_socket_name_format, &mock->photon_fd); - CHECK(mock->plasma_fd >= 0 && mock->photon_fd >= 0); + CHECK(mock->plasma_store_fd >= 0 && mock->photon_fd >= 0); + + UT_string *worker_command; + utstring_new(worker_command); + utstring_printf(worker_command, + "python ../../python/ray/workers/default_worker.py " + "--node-ip-address=%s --object-store-name=%s " + "--object-store-manager-name=%s --local-scheduler-name=%s " + "--redis-address=%s:%d", + node_ip_address, plasma_store_socket_name, + utstring_body(plasma_manager_socket_name), + utstring_body(photon_socket_name), redis_addr, redis_port); + mock->photon_state = init_local_scheduler( "127.0.0.1", mock->loop, redis_addr, redis_port, utstring_body(photon_socket_name), plasma_store_socket_name, - utstring_body(plasma_manager_socket_name), NULL, false, NULL, - static_resource_conf); + utstring_body(plasma_manager_socket_name), NULL, false, + static_resource_conf, utstring_body(worker_command), num_workers); + + /* Accept the workers as clients to the plasma manager. */ + for (int i = 0; i < num_workers; ++i) { + accept_client(mock->plasma_manager_fd); + } + /* Connect a Photon client. */ - mock->conn = photon_connect(utstring_body(photon_socket_name)); - new_client_connection(mock->loop, mock->photon_fd, - (void *) mock->photon_state, 0); + mock->num_photon_conns = num_mock_workers; + mock->conns = malloc(sizeof(photon_conn *) * num_mock_workers); + for (int i = 0; i < num_mock_workers; ++i) { + mock->conns[i] = photon_connect(utstring_body(photon_socket_name)); + new_client_connection(mock->loop, mock->photon_fd, + (void *) mock->photon_state, 0); + } + + utstring_free(worker_command); utstring_free(plasma_manager_socket_name); utstring_free(photon_socket_name); return mock; } void destroy_photon_mock(photon_mock *mock) { - photon_disconnect(mock->conn); - close(mock->photon_fd); - close(mock->plasma_fd); + for (int i = 0; i < mock->num_photon_conns; ++i) { + photon_disconnect(mock->conns[i]); + } + free(mock->conns); /* This also frees mock->loop. */ free_local_scheduler(mock->photon_state); + close(mock->plasma_store_fd); + close(mock->plasma_manager_fd); free(mock); } @@ -103,7 +137,9 @@ void reset_worker(photon_mock *mock, local_scheduler_client *worker) { * value, the task should get assigned to a worker again. */ TEST object_reconstruction_test(void) { - photon_mock *photon = init_photon_mock(true); + photon_mock *photon = init_photon_mock(true, 0, 1); + photon_conn *worker = photon->conns[0]; + /* Create a task with zero dependencies and one return value. */ task_spec *spec = example_task_spec(0, 1); object_id return_id = task_return(spec, 0); @@ -125,10 +161,10 @@ TEST object_reconstruction_test(void) { if (pid == 0) { /* Make sure we receive the task twice. First from the initial submission, * and second from the reconstruct request. */ - photon_submit(photon->conn, spec); - task_spec *task_assigned = photon_get_task(photon->conn); + photon_submit(worker, spec); + task_spec *task_assigned = photon_get_task(worker); ASSERT_EQ(memcmp(task_assigned, spec, task_spec_size(spec)), 0); - task_spec *reconstruct_task = photon_get_task(photon->conn); + task_spec *reconstruct_task = photon_get_task(worker); ASSERT_EQ(memcmp(reconstruct_task, spec, task_spec_size(spec)), 0); /* Clean up. */ free_task_spec(reconstruct_task); @@ -150,7 +186,7 @@ TEST object_reconstruction_test(void) { (retry_info *) &photon_retry, NULL, NULL); /* Trigger reconstruction, and run the event loop again. */ object_id return_id = task_return(spec, 0); - photon_reconstruct_object(photon->conn, return_id); + photon_reconstruct_object(worker, return_id); event_loop_add_timer(photon->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(photon->loop); @@ -171,7 +207,8 @@ TEST object_reconstruction_test(void) { * should trigger reconstruction of all previous tasks in the lineage. */ TEST object_reconstruction_recursive_test(void) { - photon_mock *photon = init_photon_mock(true); + photon_mock *photon = init_photon_mock(true, 0, 1); + photon_conn *worker = photon->conns[0]; /* Create a chain of tasks, each one dependent on the one before it. Mark * each object as available so that tasks will run immediately. */ const int NUM_TASKS = 10; @@ -204,11 +241,11 @@ TEST object_reconstruction_recursive_test(void) { if (pid == 0) { /* Submit the tasks, and make sure each one gets assigned to a worker. */ for (int i = 0; i < NUM_TASKS; ++i) { - photon_submit(photon->conn, specs[i]); + photon_submit(worker, specs[i]); } /* Make sure we receive each task from the initial submission. */ for (int i = 0; i < NUM_TASKS; ++i) { - task_spec *task_assigned = photon_get_task(photon->conn); + task_spec *task_assigned = photon_get_task(worker); ASSERT_EQ(memcmp(task_assigned, specs[i], task_spec_size(task_assigned)), 0); free_task_spec(task_assigned); @@ -216,7 +253,7 @@ TEST object_reconstruction_recursive_test(void) { /* Check that the workers receive all tasks in the final return object's * lineage during reconstruction. */ for (int i = 0; i < NUM_TASKS; ++i) { - task_spec *task_assigned = photon_get_task(photon->conn); + task_spec *task_assigned = photon_get_task(worker); bool found = false; for (int j = 0; j < NUM_TASKS; ++j) { if (specs[j] == NULL) { @@ -249,7 +286,7 @@ TEST object_reconstruction_recursive_test(void) { /* Trigger reconstruction for the last object, and run the event loop * again. */ object_id return_id = task_return(specs[NUM_TASKS - 1], 0); - photon_reconstruct_object(photon->conn, return_id); + photon_reconstruct_object(worker, return_id); event_loop_add_timer(photon->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(photon->loop); @@ -275,25 +312,27 @@ task_spec *object_reconstruction_suppression_spec; void object_reconstruction_suppression_callback(object_id object_id, void *user_context) { /* Submit the task after adding the object to the object table. */ - photon_mock *photon = user_context; - photon_submit(photon->conn, object_reconstruction_suppression_spec); + photon_conn *worker = user_context; + photon_submit(worker, object_reconstruction_suppression_spec); } TEST object_reconstruction_suppression_test(void) { - photon_mock *photon = init_photon_mock(true); + photon_mock *photon = init_photon_mock(true, 0, 1); + photon_conn *worker = photon->conns[0]; + object_reconstruction_suppression_spec = example_task_spec(0, 1); object_id return_id = task_return(object_reconstruction_suppression_spec, 0); pid_t pid = fork(); if (pid == 0) { /* Make sure we receive the task once. This will block until the * object_table_add callback completes. */ - task_spec *task_assigned = photon_get_task(photon->conn); + task_spec *task_assigned = photon_get_task(worker); ASSERT_EQ(memcmp(task_assigned, object_reconstruction_suppression_spec, task_spec_size(object_reconstruction_suppression_spec)), 0); /* Trigger a reconstruction. We will check that no tasks get queued as a * result of this line in the event loop process. */ - photon_reconstruct_object(photon->conn, return_id); + photon_reconstruct_object(worker, return_id); /* Clean up. */ free_task_spec(task_assigned); free_task_spec(object_reconstruction_suppression_spec); @@ -309,7 +348,7 @@ TEST object_reconstruction_suppression_test(void) { object_table_add(db, return_id, 1, (unsigned char *) NIL_DIGEST, (retry_info *) &photon_retry, object_reconstruction_suppression_callback, - (void *) photon); + (void *) worker); /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ event_loop_add_timer(photon->loop, 1000, @@ -328,7 +367,7 @@ TEST object_reconstruction_suppression_test(void) { } TEST task_dependency_test(void) { - photon_mock *photon = init_photon_mock(false); + photon_mock *photon = init_photon_mock(false, 0, 1); local_scheduler_state *state = photon->photon_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state; /* Get the first worker. */ @@ -403,7 +442,7 @@ TEST task_dependency_test(void) { } TEST task_multi_dependency_test(void) { - photon_mock *photon = init_photon_mock(false); + photon_mock *photon = init_photon_mock(false, 0, 1); local_scheduler_state *state = photon->photon_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state; /* Get the first worker. */ @@ -476,12 +515,77 @@ TEST task_multi_dependency_test(void) { PASS(); } +TEST start_kill_workers_test(void) { + /* Start some workers. */ + int num_workers = 4; + photon_mock *photon = init_photon_mock(true, num_workers, 0); + /* We start off with num_workers children processes, but no workers + * registered yet. */ + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); + ASSERT_EQ(utarray_len(photon->photon_state->workers), 0); + + /* Make sure that each worker connects to the photon scheduler. This for loop + * will hang if one of the workers does not connect. */ + for (int i = 0; i < num_workers; ++i) { + new_client_connection(photon->loop, photon->photon_fd, + (void *) photon->photon_state, 0); + } + + /* After handling each worker's initial connection, we should now have all + * workers accounted for, but we haven't yet matched up process IDs with our + * children processes. */ + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + + /* Each worker should register its process ID. */ + for (int i = 0; i < utarray_len(photon->photon_state->workers); ++i) { + local_scheduler_client *worker = + *(local_scheduler_client **) utarray_eltptr( + photon->photon_state->workers, i); + process_message(photon->photon_state->loop, worker->sock, worker, 0); + } + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + + /* After killing a worker, its state is cleaned up. */ + local_scheduler_client *worker = *(local_scheduler_client **) utarray_eltptr( + photon->photon_state->workers, 0); + kill_worker(worker, true); + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); + + /* Start a worker after the local scheduler has been initialized. */ + start_worker(photon->photon_state); + /* Accept the workers as clients to the plasma manager. */ + int new_worker_fd = accept_client(photon->plasma_manager_fd); + /* The new worker should register its process ID. */ + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); + /* Make sure the new worker connects to the photon scheduler. */ + new_client_connection(photon->loop, photon->photon_fd, + (void *) photon->photon_state, 0); + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + /* Make sure that the new worker registers its process ID. */ + worker = *(local_scheduler_client **) utarray_eltptr( + photon->photon_state->workers, num_workers - 1); + process_message(photon->photon_state->loop, worker->sock, worker, 0); + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + + /* Clean up. */ + close(new_worker_fd); + destroy_photon_mock(photon); + PASS(); +} + SUITE(photon_tests) { RUN_REDIS_TEST(object_reconstruction_test); RUN_REDIS_TEST(object_reconstruction_recursive_test); RUN_REDIS_TEST(object_reconstruction_suppression_test); RUN_TEST(task_dependency_test); RUN_TEST(task_multi_dependency_test); + RUN_TEST(start_kill_workers_test); } GREATEST_MAIN_DEFS(); diff --git a/test/component_failures_test.py b/test/component_failures_test.py index a6e385609..e1bf0219a 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -16,7 +16,10 @@ class ComponentFailureTest(unittest.TestCase): def f(): ray.worker.global_worker.plasma_client.get(obj_id) - ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.worker._init(num_workers=1, + driver_mode=ray.SILENT_MODE, + start_workers_from_local_scheduler=False, + start_ray_local=True) # Have the worker wait in a get call. f.remote() @@ -44,7 +47,10 @@ class ComponentFailureTest(unittest.TestCase): def f(): ray.worker.global_worker.plasma_client.wait([obj_id]) - ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.worker._init(num_workers=1, + driver_mode=ray.SILENT_MODE, + start_workers_from_local_scheduler=False, + start_ray_local=True) # Have the worker wait in a get call. f.remote() diff --git a/test/runtest.py b/test/runtest.py index 24aeff7a1..702725f7c 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -183,6 +183,22 @@ class SerializationTest(unittest.TestCase): class WorkerTest(unittest.TestCase): + def testPythonWorkers(self): + # Test the codepath for starting workers from the Python script, instead of + # the local scheduler. This codepath is for debugging purposes only. + num_workers = 4 + ray.worker._init(num_workers=num_workers, + start_workers_from_local_scheduler=False, + start_ray_local=True) + + @ray.remote + def f(x): + return x + + values = ray.get([f.remote(1) for i in range(num_workers * 2)]) + self.assertEqual(values, [1] * (num_workers * 2)) + ray.worker.cleanup() + def testPutGet(self): ray.init(num_workers=0)