diff --git a/.travis.yml b/.travis.yml index 6c12b07e5..1aa93d968 100644 --- a/.travis.yml +++ b/.travis.yml @@ -53,6 +53,8 @@ install: - ./.travis/install-dependencies.sh - ./.travis/install-ray.sh + - if [[ "$PYTHON" == "3.5" ]]; then export PATH="$HOME/miniconda/bin:$PATH"; fi + - cd python/core - bash ../../src/common/test/run_tests.sh - bash ../../src/plasma/test/run_tests.sh diff --git a/python/photon/photon_services.py b/python/photon/photon_services.py index cf98006a1..eaab77df0 100644 --- a/python/photon/photon_services.py +++ b/python/photon/photon_services.py @@ -11,11 +11,17 @@ import time def random_name(): return str(random.randint(0, 99999999)) -def start_local_scheduler(plasma_store_name, plasma_manager_name=None, - worker_path=None, plasma_address=None, - node_ip_address="127.0.0.1", redis_address=None, - use_valgrind=False, use_profiler=False, - redirect_output=False, static_resource_list=None): +def start_local_scheduler(plasma_store_name, + plasma_manager_name=None, + worker_path=None, + plasma_address=None, + node_ip_address="127.0.0.1", + redis_address=None, + use_valgrind=False, + use_profiler=False, + redirect_output=False, + static_resource_list=None, + num_workers=0): """Start a local scheduler process. Args: @@ -41,6 +47,8 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, static_resource_list (list): A list of integers specifying the local scheduler's resource capacities. The resources should appear in an order matching the order defined in task.h. + num_workers (int): The number of workers that the local scheduler should + start. Return: A tuple of the name of the local scheduler socket and the process ID of the @@ -52,7 +60,12 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None, raise Exception("Cannot use valgrind and profiler at the same time.") local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/photon/photon_scheduler") local_scheduler_name = "/tmp/scheduler{}".format(random_name()) - command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name, "-h", node_ip_address] + command = [local_scheduler_executable, + "-s", local_scheduler_name, + "-p", plasma_store_name, + "-h", node_ip_address, + "-n", str(num_workers), + ] if plasma_manager_name is not None: command += ["-m", plasma_manager_name] if worker_path is not None: diff --git a/python/ray/services.py b/python/ray/services.py index b5f7c9463..55aedf9aa 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -13,6 +13,7 @@ import subprocess import sys import time from collections import namedtuple, OrderedDict +import threading # Ray modules import photon @@ -89,18 +90,24 @@ def kill_process(p): if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER: os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data. time.sleep(0.1) # Wait for profiling data to be written. - p.kill() - # Sleeping for 0 should yield the core and allow the killed process to process - # its pending signals. - time.sleep(0) + + # Allow the process one second to exit gracefully. + p.terminate() + timer = threading.Timer(1, lambda p: p.kill(), [p]) + try: + timer.start() + p.wait() + finally: + timer.cancel() + if p.poll() is not None: return True - p.terminate() - # Sleeping for 0 should yield the core and allow the killed process to process - # its pending signals. - time.sleep(0) - if p.poll is not None: + + # If the process did not exit within one second, force kill it. + p.kill() + if p.poll() is not None: return True + # The process was not killed for some reason. return False @@ -262,10 +269,16 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): if cleanup: all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p) -def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, - plasma_manager_name, worker_path, plasma_address=None, - cleanup=True, redirect_output=False, - static_resource_list=None): +def start_local_scheduler(redis_address, + node_ip_address, + plasma_store_name, + plasma_manager_name, + worker_path, + plasma_address=None, + cleanup=True, + redirect_output=False, + static_resource_list=None, + num_workers=0): """Start a local scheduler process. Args: @@ -284,6 +297,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, /dev/null. static_resource_list (list): An ordered list of the configured resource capacities for this local scheduler. + num_workers (int): The number of workers that the local scheduler should + start. Return: The name of the local scheduler socket. @@ -296,7 +311,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name, plasma_address=plasma_address, use_profiler=RUN_PHOTON_PROFILER, redirect_output=redirect_output, - static_resource_list=static_resource_list) + static_resource_list=static_resource_list, + num_workers=num_workers) if cleanup: all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p) return local_scheduler_name @@ -391,6 +407,7 @@ def start_ray_processes(address_info=None, redirect_output=False, include_global_scheduler=False, include_redis=False, + start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None): """Helper method to start Ray processes. @@ -417,6 +434,9 @@ def start_ray_processes(address_info=None, start a global scheduler process. include_redis (bool): If include_redis is True, then start a Redis server process. + start_workers_from_local_scheduler (bool): If this flag is True, then start + the initial workers from the local scheduler. Else, start them from + Python. num_cpus: A list of length num_local_schedulers containing the number of CPUs each local scheduler should be configured with. num_gpus: A list of length num_local_schedulers containing the number of @@ -489,12 +509,25 @@ def start_ray_processes(address_info=None, object_store_addresses.append(object_store_address) time.sleep(0.1) + # Determine how many workers to start for each local scheduler. + num_workers_per_local_scheduler = [0] * num_local_schedulers + for i in range(num_workers): + num_workers_per_local_scheduler[i % num_local_schedulers] += 1 + # Start any local schedulers that do not yet exist. for i in range(len(local_scheduler_socket_names), num_local_schedulers): # Connect the local scheduler to the object store at the same index. object_store_address = object_store_addresses[i] plasma_address = "{}:{}".format(node_ip_address, object_store_address.manager_port) + # Determine how many workers this local scheduler should start. + if start_workers_from_local_scheduler: + num_local_scheduler_workers = num_workers_per_local_scheduler[i] + num_workers_per_local_scheduler[i] = 0 + else: + # If we're starting the workers from Python, the local scheduler should + # not start any workers. + num_local_scheduler_workers = 0 # Start the local scheduler. local_scheduler_name = start_local_scheduler(redis_address, node_ip_address, @@ -504,7 +537,8 @@ def start_ray_processes(address_info=None, plasma_address=plasma_address, cleanup=cleanup, redirect_output=redirect_output, - static_resource_list=[num_cpus[i], num_gpus[i]]) + static_resource_list=[num_cpus[i], num_gpus[i]], + num_workers=num_local_scheduler_workers) local_scheduler_socket_names.append(local_scheduler_name) time.sleep(0.1) @@ -513,18 +547,23 @@ def start_ray_processes(address_info=None, assert len(object_store_addresses) == num_local_schedulers assert len(local_scheduler_socket_names) == num_local_schedulers - # Start the workers. - for i in range(num_workers): - object_store_address = object_store_addresses[i % num_local_schedulers] - local_scheduler_name = local_scheduler_socket_names[i % num_local_schedulers] - start_worker(node_ip_address, - object_store_address.name, - object_store_address.manager_name, - local_scheduler_name, - redis_address, - worker_path, - cleanup=cleanup, - redirect_output=redirect_output) + # Start any workers that the local scheduler has not already started. + for i, num_local_scheduler_workers in enumerate(num_workers_per_local_scheduler): + object_store_address = object_store_addresses[i] + local_scheduler_name = local_scheduler_socket_names[i] + for j in range(num_local_scheduler_workers): + start_worker(node_ip_address, + object_store_address.name, + object_store_address.manager_name, + local_scheduler_name, + redis_address, + worker_path, + cleanup=cleanup, + redirect_output=redirect_output) + num_workers_per_local_scheduler[i] -= 1 + + # Make sure that we've started all the workers. + assert(sum(num_workers_per_local_scheduler) == 0) # Return the addresses of the relevant processes. return address_info @@ -581,6 +620,7 @@ def start_ray_head(address_info=None, worker_path=None, cleanup=True, redirect_output=False, + start_workers_from_local_scheduler=True, num_cpus=None, num_gpus=None): """Start Ray in local mode. @@ -603,6 +643,9 @@ def start_ray_head(address_info=None, method exits. redirect_output (bool): True if stdout and stderr should be redirected to /dev/null. + start_workers_from_local_scheduler (bool): If this flag is True, then start + the initial workers from the local scheduler. Else, start them from + Python. num_cpus (int): number of cpus to configure the local scheduler with. num_gpus (int): number of gpus to configure the local scheduler with. @@ -619,5 +662,6 @@ def start_ray_head(address_info=None, redirect_output=redirect_output, include_global_scheduler=True, include_redis=True, + start_workers_from_local_scheduler=start_workers_from_local_scheduler, num_cpus=num_cpus, num_gpus=num_gpus) diff --git a/python/ray/worker.py b/python/ray/worker.py index 8930e98ce..1b98bdd3b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -735,9 +735,15 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5): time.sleep(1) counter += 1 -def _init(address_info=None, start_ray_local=False, object_id_seed=None, - num_workers=None, num_local_schedulers=None, - driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None): +def _init(address_info=None, + start_ray_local=False, + object_id_seed=None, + num_workers=None, + num_local_schedulers=None, + driver_mode=SCRIPT_MODE, + start_workers_from_local_scheduler=True, + num_cpus=None, + num_gpus=None): """Helper method to connect to an existing Ray cluster or start a new one. This method handles two cases. Either a Ray cluster already exists and we @@ -764,6 +770,9 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, only provided if start_ray_local is True. driver_mode (bool): The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE. + start_workers_from_local_scheduler (bool): If this flag is True, then start + the initial workers from the local scheduler. Else, start them from + Python. The latter case is for debugging purposes only. num_cpus: A list containing the number of CPUs the local schedulers should be configured with. num_gpus: A list containing the number of GPUs the local schedulers should @@ -815,7 +824,9 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, - num_cpus=num_cpus, num_gpus=num_gpus) + start_workers_from_local_scheduler=start_workers_from_local_scheduler, + num_cpus=num_cpus, + num_gpus=num_gpus) else: if redis_address is None: raise Exception("If start_ray_local=False, then redis_address must be provided.") diff --git a/src/photon/photon.h b/src/photon/photon.h index 691daad64..4df923ac4 100644 --- a/src/photon/photon.h +++ b/src/photon/photon.h @@ -27,20 +27,14 @@ enum photon_message_type { RECONSTRUCT_OBJECT, /** Log a message to the event table. */ EVENT_LOG_MESSAGE, + /** Register a worker's process ID with the local scheduler. */ + REGISTER_PID, }; -// clang-format off -/** Contains all information that is associated to a worker. */ -typedef struct { - int sock; - /** A pointer to a task object, to update the task table. */ - task *task_in_progress; -} worker; -// clang-format on - /* These are needed to define the UT_arrays. */ UT_icd task_ptr_icd; -UT_icd worker_icd; +UT_icd workers_icd; +UT_icd pid_t_icd; /** Internal state of the scheduling algorithm. */ typedef struct scheduling_algorithm_state scheduling_algorithm_state; @@ -50,7 +44,7 @@ typedef struct scheduling_algorithm_state scheduling_algorithm_state; * scheduler. */ typedef struct { /** The script to use when starting a new worker. */ - char *start_worker_command; + const char **start_worker_command; /** Whether there is a global scheduler. */ bool global_scheduler_exists; } local_scheduler_config; @@ -65,6 +59,9 @@ typedef struct { * structs when we free the scheduler state and also to access the worker * structs in the tests. */ UT_array *workers; + /** List of the process IDs for child processes (workers) started by the + * local scheduler that have not sent a REGISTER_PID message yet. */ + UT_array *child_pids; /** The handle to the database. */ db_handle *db; /** The Plasma client. */ @@ -90,6 +87,11 @@ typedef struct { * no task is running on the worker, this will be NULL. This is used to * update the task table. */ task *task_in_progress; + /** The process ID of the client. If this is set to zero, the client has not + * yet registered a process ID. */ + pid_t pid; + /** Whether the client is a child process of the local scheduler. */ + bool is_child; /** A pointer to the local scheduler state. */ local_scheduler_state *local_scheduler_state; } local_scheduler_client; diff --git a/src/photon/photon_client.c b/src/photon/photon_client.c index 5452a299a..4b41e03a5 100644 --- a/src/photon/photon_client.c +++ b/src/photon/photon_client.c @@ -7,6 +7,11 @@ photon_conn *photon_connect(const char *photon_socket) { photon_conn *result = malloc(sizeof(photon_conn)); result->conn = connect_ipc_sock(photon_socket); + /* If this is a worker, register the process ID with the local scheduler. */ + pid_t my_pid = getpid(); + int success = write_message(result->conn, REGISTER_PID, sizeof(my_pid), + (uint8_t *) &my_pid); + CHECKM(success == 0, "Unable to register worker with local scheduler"); return result; } diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 428a8cf04..851fc65c5 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -20,6 +20,7 @@ static int PyPhotonClient_init(PyPhotonClient *self, if (!PyArg_ParseTuple(args, "s", &socket_name)) { return -1; } + /* Connect to the Photon scheduler. */ self->photon_connection = photon_connect(socket_name); return 0; } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 676fd9b62..a572dcc00 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "common.h" @@ -23,6 +24,8 @@ UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL}; UT_icd workers_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL}; +UT_icd pid_t_icd = {sizeof(pid_t), NULL, NULL, NULL}; + UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; /** @@ -53,6 +56,184 @@ void print_resource_info(const local_scheduler_state *state, #endif } +/** + * Kill a worker, if it is a child process, and clean up all of its associated + * state. + * + * @param worker A pointer to the worker we want to kill. + * @param wait A bool representing whether we should wait for the worker's + * process to exit. If the worker is not a child process, this flag is + * ignored. + * @return Void. + */ +void kill_worker(local_scheduler_client *worker, bool wait) { + /* Erase the worker from the array of workers. */ + local_scheduler_state *state = worker->local_scheduler_state; + int num_workers = utarray_len(state->workers); + for (int i = 0; i < utarray_len(state->workers); ++i) { + local_scheduler_client *active_worker = + *(local_scheduler_client **) utarray_eltptr(state->workers, i); + if (active_worker == worker) { + utarray_erase(state->workers, i, 1); + } + } + /* Make sure that we erased exactly 1 worker. */ + CHECKM(!(utarray_len(state->workers) < num_workers - 1), + "Found duplicate workers"); + CHECKM(utarray_len(state->workers) != num_workers, + "Tried to kill worker that doesn't exist"); + + /* Remove the client socket from the event loop so that we don't process the + * SIGPIPE when the worker is killed. */ + event_loop_remove_file(worker->local_scheduler_state->loop, worker->sock); + + /* If the worker has registered a process ID with us and it's a child + * process, use it to send a kill signal. */ + if (worker->is_child && worker->pid != 0) { + kill(worker->pid, SIGKILL); + if (wait) { + /* Wait for the process to exit. */ + waitpid(worker->pid, NULL, 0); + } + } + + /* Clean up the client socket after killing the worker so that the worker + * can't receive the SIGPIPE before exiting. */ + close(worker->sock); + + /* Clean up the task in progress. */ + if (worker->task_in_progress) { + /* TODO(swang): Update the task table to mark the task as lost. */ + free_task(worker->task_in_progress); + } + + LOG_DEBUG("Killed worker with pid %d", worker->pid); + free(worker); +} + +void free_local_scheduler(local_scheduler_state *state) { + /* Free the command for starting new workers. */ + if (state->config.start_worker_command != NULL) { + int i = 0; + const char *arg = state->config.start_worker_command[i]; + while (arg != NULL) { + free((void *) arg); + ++i; + arg = state->config.start_worker_command[i]; + } + free(state->config.start_worker_command); + state->config.start_worker_command = NULL; + } + + /* Disconnect from the database. */ + if (state->db != NULL) { + db_disconnect(state->db); + state->db = NULL; + } + /* Disconnect from plasma. */ + plasma_disconnect(state->plasma_conn); + state->plasma_conn = NULL; + + /* Kill any child processes that didn't register as a worker yet. */ + pid_t *worker_pid; + for (worker_pid = (pid_t *) utarray_front(state->child_pids); + worker_pid != NULL; + worker_pid = (pid_t *) utarray_next(state->child_pids, worker_pid)) { + kill(*worker_pid, SIGKILL); + waitpid(*worker_pid, NULL, 0); + LOG_DEBUG("Killed pid %d", *worker_pid); + } + utarray_free(state->child_pids); + + /* Free the list of workers and any tasks that are still in progress on those + * workers. */ + for (local_scheduler_client **worker = + (local_scheduler_client **) utarray_front(state->workers); + worker != NULL; + worker = (local_scheduler_client **) utarray_front(state->workers)) { + kill_worker(*worker, true); + } + utarray_free(state->workers); + state->workers = NULL; + + /* Free the algorithm state. */ + free_scheduling_algorithm_state(state->algorithm_state); + state->algorithm_state = NULL; + /* Free the input buffer. */ + utarray_free(state->input_buffer); + state->input_buffer = NULL; + /* Destroy the event loop. */ + event_loop_destroy(state->loop); + state->loop = NULL; + /* Free the scheduler state. */ + free(state); +} + +/** + * Start a new worker as a child process. + * + * @param state The state of the local scheduler. + * @return Void. + */ +void start_worker(local_scheduler_state *state) { + /* We can't start a worker if we don't have the path to the worker script. */ + CHECK(state->config.start_worker_command != NULL); + /* Launch the process to create the worker. */ + pid_t pid = fork(); + if (pid != 0) { + utarray_push_back(state->child_pids, &pid); + LOG_DEBUG("Started worker with pid %d", pid); + return; + } + + /* Try to execute the worker command. Exit if we're not successful. */ + execvp(state->config.start_worker_command[0], + (char *const *) state->config.start_worker_command); + free_local_scheduler(state); + LOG_FATAL("Failed to start worker"); +} + +/** + * Parse the command to start a worker. This takes in the command string, + * splits it into tokens on the space characters, and allocates an array of the + * tokens, terminated by a NULL pointer. + * + * @param command The command string to start a worker. + * @return A pointer to an array of strings, the tokens in the command string. + * The last element is a NULL pointer. + */ +const char **parse_command(const char *command) { + /* Count the number of tokens. */ + char *command_copy = strdup(command); + const char *delimiter = " "; + char *token = NULL; + int num_args = 0; + token = strtok(command_copy, delimiter); + while (token != NULL) { + ++num_args; + token = strtok(NULL, delimiter); + } + free(command_copy); + + /* Allocate a NULL-terminated array for the tokens. */ + const char **command_args = malloc((num_args + 1) * sizeof(const char *)); + command_args[num_args] = NULL; + + /* Fill in the token array. */ + command_copy = strdup(command); + token = strtok(command_copy, delimiter); + int i = 0; + while (token != NULL) { + command_args[i] = strdup(token); + ++i; + token = strtok(NULL, delimiter); + } + free(command_copy); + + CHECK(num_args == i); + return command_args; +} + local_scheduler_state *init_local_scheduler( const char *node_ip_address, event_loop *loop, @@ -63,12 +244,13 @@ local_scheduler_state *init_local_scheduler( const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, + const double static_resource_conf[], const char *start_worker_command, - const double static_resource_conf[]) { + int num_workers) { local_scheduler_state *state = malloc(sizeof(local_scheduler_state)); /* Set the configuration struct for the local scheduler. */ if (start_worker_command != NULL) { - state->config.start_worker_command = strdup(start_worker_command); + state->config.start_worker_command = parse_command(start_worker_command); } else { state->config.start_worker_command = NULL; } @@ -123,53 +305,16 @@ local_scheduler_state *init_local_scheduler( } /* Print some debug information about resource configuration. */ print_resource_info(state, NULL); + + /* Start the initial set of workers. */ + utarray_new(state->child_pids, &pid_t_icd); + for (int i = 0; i < num_workers; ++i) { + start_worker(state); + } + return state; }; -void free_local_scheduler(local_scheduler_state *state) { - /* Free the command for starting new workers. */ - if (state->config.start_worker_command != NULL) { - free(state->config.start_worker_command); - state->config.start_worker_command = NULL; - } - - /* Disconnect from the database. */ - if (state->db != NULL) { - db_disconnect(state->db); - state->db = NULL; - } - /* Disconnect from plasma. */ - plasma_disconnect(state->plasma_conn); - state->plasma_conn = NULL; - - /* Free the list of workers and any tasks that are still in progress on those - * workers. */ - for (int i = 0; i < utarray_len(state->workers); ++i) { - local_scheduler_client **worker = - (local_scheduler_client **) utarray_eltptr(state->workers, i); - if ((*worker)->task_in_progress != NULL) { - free_task((*worker)->task_in_progress); - (*worker)->task_in_progress = NULL; - } - free(*worker); - *worker = NULL; - } - utarray_free(state->workers); - state->workers = NULL; - - /* Free the algorithm state. */ - free_scheduling_algorithm_state(state->algorithm_state); - state->algorithm_state = NULL; - /* Free the input buffer. */ - utarray_free(state->input_buffer); - state->input_buffer = NULL; - /* Destroy the event loop. */ - event_loop_destroy(state->loop); - state->loop = NULL; - /* Free the scheduler state. */ - free(state); -} - void assign_task_to_worker(local_scheduler_state *state, task_spec *spec, local_scheduler_client *worker) { @@ -384,10 +529,33 @@ void process_message(event_loop *loop, } break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); - event_loop_remove_file(loop, client_sock); + kill_worker(worker, false); } break; case LOG_MESSAGE: { } break; + case REGISTER_PID: { + pid_t *worker_pid = (pid_t *) utarray_front(state->input_buffer); + worker->pid = *worker_pid; + + /* Determine if this worker is one of our child processes. */ + LOG_DEBUG("Pid is %d", *worker_pid); + pid_t *child_pid; + int index = 0; + for (child_pid = (pid_t *) utarray_front(state->child_pids); + child_pid != NULL; + child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) { + if (*child_pid == *worker_pid) { + /* If this worker is one of our child processes, mark it as a child so + * that we know that we can wait for the process to exit during + * cleanup. */ + worker->is_child = true; + utarray_erase(state->child_pids, index, 1); + LOG_DEBUG("Found matching child pid %d", *worker_pid); + break; + } + ++index; + } + } break; default: /* This code should be unreachable. */ CHECK(0); @@ -405,6 +573,8 @@ void new_client_connection(event_loop *loop, local_scheduler_client *worker = malloc(sizeof(local_scheduler_client)); worker->sock = new_socket; worker->task_in_progress = NULL; + worker->pid = 0; + worker->is_child = false; worker->local_scheduler_state = state; utarray_push_back(state->workers, &worker); event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, @@ -417,6 +587,7 @@ void new_client_connection(event_loop *loop, local_scheduler_state *g_state; void signal_handler(int signal) { + LOG_DEBUG("Signal was %d", signal); if (signal == SIGTERM) { free_local_scheduler(g_state); exit(0); @@ -442,14 +613,6 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) { return LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS; } -void start_new_worker(local_scheduler_state *state) { - /* We can't start a worker if we don't have the path to the worker script. */ - CHECK(state->config.start_worker_command != NULL); - /* Launch the process to create the worker. */ - FILE *p = popen(state->config.start_worker_command, "r"); - UNUSED(p); -} - void start_server(const char *node_ip_address, const char *socket_name, const char *redis_addr, @@ -458,8 +621,9 @@ void start_server(const char *node_ip_address, const char *plasma_manager_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, + const double static_resource_conf[], const char *start_worker_command, - const double static_resource_conf[]) { + int num_workers) { /* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write * to a client that has already died, the local scheduler could die. */ signal(SIGPIPE, SIG_IGN); @@ -468,8 +632,8 @@ void start_server(const char *node_ip_address, g_state = init_local_scheduler( node_ip_address, loop, redis_addr, redis_port, socket_name, plasma_store_socket_name, plasma_manager_socket_name, - plasma_manager_address, global_scheduler_exists, start_worker_command, - static_resource_conf); + plasma_manager_address, global_scheduler_exists, static_resource_conf, + start_worker_command, num_workers); /* Register a callback for registering new clients. */ event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection, g_state); @@ -518,14 +682,16 @@ int main(int argc, char *argv[]) { char *plasma_manager_address = NULL; /* The IP address of the node that this local scheduler is running on. */ char *node_ip_address = NULL; - /* The command to run when starting new workers. */ - char *start_worker_command = NULL; /* Comma-separated list of configured resource capabilities for this node. */ char *static_resource_list = NULL; double static_resource_conf[MAX_RESOURCE_INDEX]; + /* The command to run when starting new workers. */ + char *start_worker_command = NULL; + /* The number of workers to start. */ + char *num_workers_str = NULL; int c; bool global_scheduler_exists = true; - while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:c:")) != -1) { + while ((c = getopt(argc, argv, "s:r:p:m:ga:h:c:w:n:")) != -1) { switch (c) { case 's': scheduler_socket_name = optarg; @@ -548,11 +714,14 @@ int main(int argc, char *argv[]) { case 'h': node_ip_address = optarg; break; + case 'c': + static_resource_list = optarg; + break; case 'w': start_worker_command = optarg; break; - case 'c': - static_resource_list = optarg; + case 'n': + num_workers_str = optarg; break; default: LOG_FATAL("unknown option %c", c); @@ -585,6 +754,16 @@ int main(int argc, char *argv[]) { if (!node_ip_address) { LOG_FATAL("please specify the node IP address with -h switch"); } + int num_workers = 0; + if (num_workers_str) { + num_workers = strtol(num_workers_str, NULL, 10); + if (num_workers < 0) { + LOG_FATAL("Number of workers must be nonnegative"); + } + } + + char *redis_addr = NULL; + int redis_port = -1; if (!redis_addr_port) { /* Start the local scheduler without connecting to Redis. In this case, all * submitted tasks will be queued and scheduled locally. */ @@ -593,31 +772,34 @@ int main(int argc, char *argv[]) { "if a plasma manager socket name is provided with the -m switch, " "then a redis address must be provided with the -r switch"); } - start_server(node_ip_address, scheduler_socket_name, NULL, -1, - plasma_store_socket_name, NULL, plasma_manager_address, - global_scheduler_exists, start_worker_command, - static_resource_conf); } else { + char redis_addr_buffer[16] = {0}; + char redis_port_str[6] = {0}; /* Parse the Redis address into an IP address and a port. */ - char redis_addr[16] = {0}; - char redis_port[6] = {0}; - int num_assigned = - sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port); + int num_assigned = sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", + redis_addr_buffer, redis_port_str); if (num_assigned != 2) { LOG_FATAL( "if a redis address is provided with the -r switch, it should be " "formatted like 127.0.0.1:6379"); } + redis_addr = redis_addr_buffer; + redis_port = strtol(redis_port_str, NULL, 10); + if (redis_port == 0) { + LOG_FATAL("Unable to parse port number from redis address %s", + redis_addr_port); + } if (!plasma_manager_socket_name) { LOG_FATAL( "please specify socket for connecting to Plasma manager with -m " "switch"); } - start_server(node_ip_address, scheduler_socket_name, &redis_addr[0], - atoi(redis_port), plasma_store_socket_name, - plasma_manager_socket_name, plasma_manager_address, - global_scheduler_exists, start_worker_command, - static_resource_conf); } + + LOG_INFO("Start worker command is %s", start_worker_command); + start_server(node_ip_address, scheduler_socket_name, redis_addr, redis_port, + plasma_store_socket_name, plasma_manager_socket_name, + plasma_manager_address, global_scheduler_exists, + static_resource_conf, start_worker_command, num_workers); } #endif diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 3fa3f9344..5e454804c 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -79,8 +79,9 @@ local_scheduler_state *init_local_scheduler( const char *plasma_store_socket_name, const char *plasma_manager_address, bool global_scheduler_exists, + const double static_resource_vector[], const char *worker_path, - const double static_resource_vector[]); + int num_workers); void free_local_scheduler(local_scheduler_state *state); @@ -90,6 +91,10 @@ void process_message(event_loop *loop, int client_sock, void *context, int events); + +void kill_worker(local_scheduler_client *worker, bool wait); + +void start_worker(local_scheduler_state *state); #endif #endif /* PHOTON_SCHEDULER_H */ diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index 414e2c31b..1ff10e907 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -33,25 +33,33 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { } typedef struct { - /** A socket to mock the Plasma store. */ - int plasma_fd; + /** A socket to mock the Plasma manager. Clients (such as workers) that + * connect to this file descriptor must be accepted. */ + int plasma_manager_fd; + /** A socket to communicate with the Plasma store. */ + int plasma_store_fd; /** Photon's socket for IPC requests. */ int photon_fd; /** Photon's local scheduler state. */ local_scheduler_state *photon_state; /** Photon's event loop. */ event_loop *loop; - /** A Photon client connection. */ - photon_conn *conn; + /** Number of Photon client connections, or mock workers. */ + int num_photon_conns; + /** Photon client connections. */ + photon_conn **conns; } photon_mock; -photon_mock *init_photon_mock(bool connect_to_redis) { +photon_mock *init_photon_mock(bool connect_to_redis, + int num_workers, + int num_mock_workers) { + const char *node_ip_address = "127.0.0.1"; const char *redis_addr = NULL; int redis_port = -1; const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS, DEFAULT_NUM_GPUS}; if (connect_to_redis) { - redis_addr = "127.0.0.1"; + redis_addr = node_ip_address; redis_port = 6379; } @@ -59,34 +67,60 @@ photon_mock *init_photon_mock(bool connect_to_redis) { memset(mock, 0, sizeof(photon_mock)); mock->loop = event_loop_create(); /* Bind to the Photon port and initialize the Photon scheduler. */ - /* TODO(rkn): Why are we reusing mock->plasma_fd for both the store and the - * manager? */ - UT_string *plasma_manager_socket_name = - bind_ipc_sock_retry(plasma_manager_socket_name_format, &mock->plasma_fd); - mock->plasma_fd = socket_connect_retry(plasma_store_socket_name, 5, 100); + UT_string *plasma_manager_socket_name = bind_ipc_sock_retry( + plasma_manager_socket_name_format, &mock->plasma_manager_fd); + mock->plasma_store_fd = + socket_connect_retry(plasma_store_socket_name, 5, 100); UT_string *photon_socket_name = bind_ipc_sock_retry(photon_socket_name_format, &mock->photon_fd); - CHECK(mock->plasma_fd >= 0 && mock->photon_fd >= 0); + CHECK(mock->plasma_store_fd >= 0 && mock->photon_fd >= 0); + + UT_string *worker_command; + utstring_new(worker_command); + utstring_printf(worker_command, + "python ../../python/ray/workers/default_worker.py " + "--node-ip-address=%s --object-store-name=%s " + "--object-store-manager-name=%s --local-scheduler-name=%s " + "--redis-address=%s:%d", + node_ip_address, plasma_store_socket_name, + utstring_body(plasma_manager_socket_name), + utstring_body(photon_socket_name), redis_addr, redis_port); + mock->photon_state = init_local_scheduler( "127.0.0.1", mock->loop, redis_addr, redis_port, utstring_body(photon_socket_name), plasma_store_socket_name, - utstring_body(plasma_manager_socket_name), NULL, false, NULL, - static_resource_conf); + utstring_body(plasma_manager_socket_name), NULL, false, + static_resource_conf, utstring_body(worker_command), num_workers); + + /* Accept the workers as clients to the plasma manager. */ + for (int i = 0; i < num_workers; ++i) { + accept_client(mock->plasma_manager_fd); + } + /* Connect a Photon client. */ - mock->conn = photon_connect(utstring_body(photon_socket_name)); - new_client_connection(mock->loop, mock->photon_fd, - (void *) mock->photon_state, 0); + mock->num_photon_conns = num_mock_workers; + mock->conns = malloc(sizeof(photon_conn *) * num_mock_workers); + for (int i = 0; i < num_mock_workers; ++i) { + mock->conns[i] = photon_connect(utstring_body(photon_socket_name)); + new_client_connection(mock->loop, mock->photon_fd, + (void *) mock->photon_state, 0); + } + + utstring_free(worker_command); utstring_free(plasma_manager_socket_name); utstring_free(photon_socket_name); return mock; } void destroy_photon_mock(photon_mock *mock) { - photon_disconnect(mock->conn); - close(mock->photon_fd); - close(mock->plasma_fd); + for (int i = 0; i < mock->num_photon_conns; ++i) { + photon_disconnect(mock->conns[i]); + } + free(mock->conns); /* This also frees mock->loop. */ free_local_scheduler(mock->photon_state); + close(mock->plasma_store_fd); + close(mock->plasma_manager_fd); free(mock); } @@ -103,7 +137,9 @@ void reset_worker(photon_mock *mock, local_scheduler_client *worker) { * value, the task should get assigned to a worker again. */ TEST object_reconstruction_test(void) { - photon_mock *photon = init_photon_mock(true); + photon_mock *photon = init_photon_mock(true, 0, 1); + photon_conn *worker = photon->conns[0]; + /* Create a task with zero dependencies and one return value. */ task_spec *spec = example_task_spec(0, 1); object_id return_id = task_return(spec, 0); @@ -125,10 +161,10 @@ TEST object_reconstruction_test(void) { if (pid == 0) { /* Make sure we receive the task twice. First from the initial submission, * and second from the reconstruct request. */ - photon_submit(photon->conn, spec); - task_spec *task_assigned = photon_get_task(photon->conn); + photon_submit(worker, spec); + task_spec *task_assigned = photon_get_task(worker); ASSERT_EQ(memcmp(task_assigned, spec, task_spec_size(spec)), 0); - task_spec *reconstruct_task = photon_get_task(photon->conn); + task_spec *reconstruct_task = photon_get_task(worker); ASSERT_EQ(memcmp(reconstruct_task, spec, task_spec_size(spec)), 0); /* Clean up. */ free_task_spec(reconstruct_task); @@ -150,7 +186,7 @@ TEST object_reconstruction_test(void) { (retry_info *) &photon_retry, NULL, NULL); /* Trigger reconstruction, and run the event loop again. */ object_id return_id = task_return(spec, 0); - photon_reconstruct_object(photon->conn, return_id); + photon_reconstruct_object(worker, return_id); event_loop_add_timer(photon->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(photon->loop); @@ -171,7 +207,8 @@ TEST object_reconstruction_test(void) { * should trigger reconstruction of all previous tasks in the lineage. */ TEST object_reconstruction_recursive_test(void) { - photon_mock *photon = init_photon_mock(true); + photon_mock *photon = init_photon_mock(true, 0, 1); + photon_conn *worker = photon->conns[0]; /* Create a chain of tasks, each one dependent on the one before it. Mark * each object as available so that tasks will run immediately. */ const int NUM_TASKS = 10; @@ -204,11 +241,11 @@ TEST object_reconstruction_recursive_test(void) { if (pid == 0) { /* Submit the tasks, and make sure each one gets assigned to a worker. */ for (int i = 0; i < NUM_TASKS; ++i) { - photon_submit(photon->conn, specs[i]); + photon_submit(worker, specs[i]); } /* Make sure we receive each task from the initial submission. */ for (int i = 0; i < NUM_TASKS; ++i) { - task_spec *task_assigned = photon_get_task(photon->conn); + task_spec *task_assigned = photon_get_task(worker); ASSERT_EQ(memcmp(task_assigned, specs[i], task_spec_size(task_assigned)), 0); free_task_spec(task_assigned); @@ -216,7 +253,7 @@ TEST object_reconstruction_recursive_test(void) { /* Check that the workers receive all tasks in the final return object's * lineage during reconstruction. */ for (int i = 0; i < NUM_TASKS; ++i) { - task_spec *task_assigned = photon_get_task(photon->conn); + task_spec *task_assigned = photon_get_task(worker); bool found = false; for (int j = 0; j < NUM_TASKS; ++j) { if (specs[j] == NULL) { @@ -249,7 +286,7 @@ TEST object_reconstruction_recursive_test(void) { /* Trigger reconstruction for the last object, and run the event loop * again. */ object_id return_id = task_return(specs[NUM_TASKS - 1], 0); - photon_reconstruct_object(photon->conn, return_id); + photon_reconstruct_object(worker, return_id); event_loop_add_timer(photon->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(photon->loop); @@ -275,25 +312,27 @@ task_spec *object_reconstruction_suppression_spec; void object_reconstruction_suppression_callback(object_id object_id, void *user_context) { /* Submit the task after adding the object to the object table. */ - photon_mock *photon = user_context; - photon_submit(photon->conn, object_reconstruction_suppression_spec); + photon_conn *worker = user_context; + photon_submit(worker, object_reconstruction_suppression_spec); } TEST object_reconstruction_suppression_test(void) { - photon_mock *photon = init_photon_mock(true); + photon_mock *photon = init_photon_mock(true, 0, 1); + photon_conn *worker = photon->conns[0]; + object_reconstruction_suppression_spec = example_task_spec(0, 1); object_id return_id = task_return(object_reconstruction_suppression_spec, 0); pid_t pid = fork(); if (pid == 0) { /* Make sure we receive the task once. This will block until the * object_table_add callback completes. */ - task_spec *task_assigned = photon_get_task(photon->conn); + task_spec *task_assigned = photon_get_task(worker); ASSERT_EQ(memcmp(task_assigned, object_reconstruction_suppression_spec, task_spec_size(object_reconstruction_suppression_spec)), 0); /* Trigger a reconstruction. We will check that no tasks get queued as a * result of this line in the event loop process. */ - photon_reconstruct_object(photon->conn, return_id); + photon_reconstruct_object(worker, return_id); /* Clean up. */ free_task_spec(task_assigned); free_task_spec(object_reconstruction_suppression_spec); @@ -309,7 +348,7 @@ TEST object_reconstruction_suppression_test(void) { object_table_add(db, return_id, 1, (unsigned char *) NIL_DIGEST, (retry_info *) &photon_retry, object_reconstruction_suppression_callback, - (void *) photon); + (void *) worker); /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ event_loop_add_timer(photon->loop, 1000, @@ -328,7 +367,7 @@ TEST object_reconstruction_suppression_test(void) { } TEST task_dependency_test(void) { - photon_mock *photon = init_photon_mock(false); + photon_mock *photon = init_photon_mock(false, 0, 1); local_scheduler_state *state = photon->photon_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state; /* Get the first worker. */ @@ -403,7 +442,7 @@ TEST task_dependency_test(void) { } TEST task_multi_dependency_test(void) { - photon_mock *photon = init_photon_mock(false); + photon_mock *photon = init_photon_mock(false, 0, 1); local_scheduler_state *state = photon->photon_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state; /* Get the first worker. */ @@ -476,12 +515,77 @@ TEST task_multi_dependency_test(void) { PASS(); } +TEST start_kill_workers_test(void) { + /* Start some workers. */ + int num_workers = 4; + photon_mock *photon = init_photon_mock(true, num_workers, 0); + /* We start off with num_workers children processes, but no workers + * registered yet. */ + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); + ASSERT_EQ(utarray_len(photon->photon_state->workers), 0); + + /* Make sure that each worker connects to the photon scheduler. This for loop + * will hang if one of the workers does not connect. */ + for (int i = 0; i < num_workers; ++i) { + new_client_connection(photon->loop, photon->photon_fd, + (void *) photon->photon_state, 0); + } + + /* After handling each worker's initial connection, we should now have all + * workers accounted for, but we haven't yet matched up process IDs with our + * children processes. */ + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + + /* Each worker should register its process ID. */ + for (int i = 0; i < utarray_len(photon->photon_state->workers); ++i) { + local_scheduler_client *worker = + *(local_scheduler_client **) utarray_eltptr( + photon->photon_state->workers, i); + process_message(photon->photon_state->loop, worker->sock, worker, 0); + } + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + + /* After killing a worker, its state is cleaned up. */ + local_scheduler_client *worker = *(local_scheduler_client **) utarray_eltptr( + photon->photon_state->workers, 0); + kill_worker(worker, true); + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); + + /* Start a worker after the local scheduler has been initialized. */ + start_worker(photon->photon_state); + /* Accept the workers as clients to the plasma manager. */ + int new_worker_fd = accept_client(photon->plasma_manager_fd); + /* The new worker should register its process ID. */ + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); + /* Make sure the new worker connects to the photon scheduler. */ + new_client_connection(photon->loop, photon->photon_fd, + (void *) photon->photon_state, 0); + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + /* Make sure that the new worker registers its process ID. */ + worker = *(local_scheduler_client **) utarray_eltptr( + photon->photon_state->workers, num_workers - 1); + process_message(photon->photon_state->loop, worker->sock, worker, 0); + ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); + ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers); + + /* Clean up. */ + close(new_worker_fd); + destroy_photon_mock(photon); + PASS(); +} + SUITE(photon_tests) { RUN_REDIS_TEST(object_reconstruction_test); RUN_REDIS_TEST(object_reconstruction_recursive_test); RUN_REDIS_TEST(object_reconstruction_suppression_test); RUN_TEST(task_dependency_test); RUN_TEST(task_multi_dependency_test); + RUN_TEST(start_kill_workers_test); } GREATEST_MAIN_DEFS(); diff --git a/test/component_failures_test.py b/test/component_failures_test.py index a6e385609..e1bf0219a 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -16,7 +16,10 @@ class ComponentFailureTest(unittest.TestCase): def f(): ray.worker.global_worker.plasma_client.get(obj_id) - ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.worker._init(num_workers=1, + driver_mode=ray.SILENT_MODE, + start_workers_from_local_scheduler=False, + start_ray_local=True) # Have the worker wait in a get call. f.remote() @@ -44,7 +47,10 @@ class ComponentFailureTest(unittest.TestCase): def f(): ray.worker.global_worker.plasma_client.wait([obj_id]) - ray.init(num_workers=1, driver_mode=ray.SILENT_MODE) + ray.worker._init(num_workers=1, + driver_mode=ray.SILENT_MODE, + start_workers_from_local_scheduler=False, + start_ray_local=True) # Have the worker wait in a get call. f.remote() diff --git a/test/runtest.py b/test/runtest.py index 24aeff7a1..702725f7c 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -183,6 +183,22 @@ class SerializationTest(unittest.TestCase): class WorkerTest(unittest.TestCase): + def testPythonWorkers(self): + # Test the codepath for starting workers from the Python script, instead of + # the local scheduler. This codepath is for debugging purposes only. + num_workers = 4 + ray.worker._init(num_workers=num_workers, + start_workers_from_local_scheduler=False, + start_ray_local=True) + + @ray.remote + def f(x): + return x + + values = ray.get([f.remote(1) for i in range(num_workers * 2)]) + self.assertEqual(values, [1] * (num_workers * 2)) + ray.worker.cleanup() + def testPutGet(self): ray.init(num_workers=0)