Start and clean up workers from the local scheduler. (#250)

* Start and clean up workers from the local scheduler

Ability to kill workers in photon scheduler

Test for old method of starting workers

Common codepath for killing workers

Common codepath for killing workers

Photon test case for starting and killing workers

fix build

Fix component failure test

Register a worker's pid as part of initial connection

Address comments and revert photon_connect

Set PATH during travis install

Fix

* Fix photon test case to accept clients on plasma manager fd
This commit is contained in:
Stephanie Wang
2017-02-10 12:46:23 -08:00
committed by Robert Nishihara
parent ec175b7dfb
commit 2b8e6485e3
12 changed files with 556 additions and 165 deletions
+2
View File
@@ -53,6 +53,8 @@ install:
- ./.travis/install-dependencies.sh
- ./.travis/install-ray.sh
- if [[ "$PYTHON" == "3.5" ]]; then export PATH="$HOME/miniconda/bin:$PATH"; fi
- cd python/core
- bash ../../src/common/test/run_tests.sh
- bash ../../src/plasma/test/run_tests.sh
+19 -6
View File
@@ -11,11 +11,17 @@ import time
def random_name():
return str(random.randint(0, 99999999))
def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
worker_path=None, plasma_address=None,
node_ip_address="127.0.0.1", redis_address=None,
use_valgrind=False, use_profiler=False,
redirect_output=False, static_resource_list=None):
def start_local_scheduler(plasma_store_name,
plasma_manager_name=None,
worker_path=None,
plasma_address=None,
node_ip_address="127.0.0.1",
redis_address=None,
use_valgrind=False,
use_profiler=False,
redirect_output=False,
static_resource_list=None,
num_workers=0):
"""Start a local scheduler process.
Args:
@@ -41,6 +47,8 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
static_resource_list (list): A list of integers specifying the local
scheduler's resource capacities. The resources should appear in an order
matching the order defined in task.h.
num_workers (int): The number of workers that the local scheduler should
start.
Return:
A tuple of the name of the local scheduler socket and the process ID of the
@@ -52,7 +60,12 @@ def start_local_scheduler(plasma_store_name, plasma_manager_name=None,
raise Exception("Cannot use valgrind and profiler at the same time.")
local_scheduler_executable = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../core/src/photon/photon_scheduler")
local_scheduler_name = "/tmp/scheduler{}".format(random_name())
command = [local_scheduler_executable, "-s", local_scheduler_name, "-p", plasma_store_name, "-h", node_ip_address]
command = [local_scheduler_executable,
"-s", local_scheduler_name,
"-p", plasma_store_name,
"-h", node_ip_address,
"-n", str(num_workers),
]
if plasma_manager_name is not None:
command += ["-m", plasma_manager_name]
if worker_path is not None:
+71 -27
View File
@@ -13,6 +13,7 @@ import subprocess
import sys
import time
from collections import namedtuple, OrderedDict
import threading
# Ray modules
import photon
@@ -89,18 +90,24 @@ def kill_process(p):
if RUN_PHOTON_PROFILER or RUN_PLASMA_MANAGER_PROFILER or RUN_PLASMA_STORE_PROFILER:
os.kill(p.pid, signal.SIGINT) # Give process signal to write profiler data.
time.sleep(0.1) # Wait for profiling data to be written.
p.kill()
# Sleeping for 0 should yield the core and allow the killed process to process
# its pending signals.
time.sleep(0)
# Allow the process one second to exit gracefully.
p.terminate()
timer = threading.Timer(1, lambda p: p.kill(), [p])
try:
timer.start()
p.wait()
finally:
timer.cancel()
if p.poll() is not None:
return True
p.terminate()
# Sleeping for 0 should yield the core and allow the killed process to process
# its pending signals.
time.sleep(0)
if p.poll is not None:
# If the process did not exit within one second, force kill it.
p.kill()
if p.poll() is not None:
return True
# The process was not killed for some reason.
return False
@@ -262,10 +269,16 @@ def start_global_scheduler(redis_address, cleanup=True, redirect_output=False):
if cleanup:
all_processes[PROCESS_TYPE_GLOBAL_SCHEDULER].append(p)
def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
plasma_manager_name, worker_path, plasma_address=None,
cleanup=True, redirect_output=False,
static_resource_list=None):
def start_local_scheduler(redis_address,
node_ip_address,
plasma_store_name,
plasma_manager_name,
worker_path,
plasma_address=None,
cleanup=True,
redirect_output=False,
static_resource_list=None,
num_workers=0):
"""Start a local scheduler process.
Args:
@@ -284,6 +297,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
/dev/null.
static_resource_list (list): An ordered list of the configured resource
capacities for this local scheduler.
num_workers (int): The number of workers that the local scheduler should
start.
Return:
The name of the local scheduler socket.
@@ -296,7 +311,8 @@ def start_local_scheduler(redis_address, node_ip_address, plasma_store_name,
plasma_address=plasma_address,
use_profiler=RUN_PHOTON_PROFILER,
redirect_output=redirect_output,
static_resource_list=static_resource_list)
static_resource_list=static_resource_list,
num_workers=num_workers)
if cleanup:
all_processes[PROCESS_TYPE_LOCAL_SCHEDULER].append(p)
return local_scheduler_name
@@ -391,6 +407,7 @@ def start_ray_processes(address_info=None,
redirect_output=False,
include_global_scheduler=False,
include_redis=False,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None):
"""Helper method to start Ray processes.
@@ -417,6 +434,9 @@ def start_ray_processes(address_info=None,
start a global scheduler process.
include_redis (bool): If include_redis is True, then start a Redis server
process.
start_workers_from_local_scheduler (bool): If this flag is True, then start
the initial workers from the local scheduler. Else, start them from
Python.
num_cpus: A list of length num_local_schedulers containing the number of
CPUs each local scheduler should be configured with.
num_gpus: A list of length num_local_schedulers containing the number of
@@ -489,12 +509,25 @@ def start_ray_processes(address_info=None,
object_store_addresses.append(object_store_address)
time.sleep(0.1)
# Determine how many workers to start for each local scheduler.
num_workers_per_local_scheduler = [0] * num_local_schedulers
for i in range(num_workers):
num_workers_per_local_scheduler[i % num_local_schedulers] += 1
# Start any local schedulers that do not yet exist.
for i in range(len(local_scheduler_socket_names), num_local_schedulers):
# Connect the local scheduler to the object store at the same index.
object_store_address = object_store_addresses[i]
plasma_address = "{}:{}".format(node_ip_address,
object_store_address.manager_port)
# Determine how many workers this local scheduler should start.
if start_workers_from_local_scheduler:
num_local_scheduler_workers = num_workers_per_local_scheduler[i]
num_workers_per_local_scheduler[i] = 0
else:
# If we're starting the workers from Python, the local scheduler should
# not start any workers.
num_local_scheduler_workers = 0
# Start the local scheduler.
local_scheduler_name = start_local_scheduler(redis_address,
node_ip_address,
@@ -504,7 +537,8 @@ def start_ray_processes(address_info=None,
plasma_address=plasma_address,
cleanup=cleanup,
redirect_output=redirect_output,
static_resource_list=[num_cpus[i], num_gpus[i]])
static_resource_list=[num_cpus[i], num_gpus[i]],
num_workers=num_local_scheduler_workers)
local_scheduler_socket_names.append(local_scheduler_name)
time.sleep(0.1)
@@ -513,18 +547,23 @@ def start_ray_processes(address_info=None,
assert len(object_store_addresses) == num_local_schedulers
assert len(local_scheduler_socket_names) == num_local_schedulers
# Start the workers.
for i in range(num_workers):
object_store_address = object_store_addresses[i % num_local_schedulers]
local_scheduler_name = local_scheduler_socket_names[i % num_local_schedulers]
start_worker(node_ip_address,
object_store_address.name,
object_store_address.manager_name,
local_scheduler_name,
redis_address,
worker_path,
cleanup=cleanup,
redirect_output=redirect_output)
# Start any workers that the local scheduler has not already started.
for i, num_local_scheduler_workers in enumerate(num_workers_per_local_scheduler):
object_store_address = object_store_addresses[i]
local_scheduler_name = local_scheduler_socket_names[i]
for j in range(num_local_scheduler_workers):
start_worker(node_ip_address,
object_store_address.name,
object_store_address.manager_name,
local_scheduler_name,
redis_address,
worker_path,
cleanup=cleanup,
redirect_output=redirect_output)
num_workers_per_local_scheduler[i] -= 1
# Make sure that we've started all the workers.
assert(sum(num_workers_per_local_scheduler) == 0)
# Return the addresses of the relevant processes.
return address_info
@@ -581,6 +620,7 @@ def start_ray_head(address_info=None,
worker_path=None,
cleanup=True,
redirect_output=False,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None):
"""Start Ray in local mode.
@@ -603,6 +643,9 @@ def start_ray_head(address_info=None,
method exits.
redirect_output (bool): True if stdout and stderr should be redirected to
/dev/null.
start_workers_from_local_scheduler (bool): If this flag is True, then start
the initial workers from the local scheduler. Else, start them from
Python.
num_cpus (int): number of cpus to configure the local scheduler with.
num_gpus (int): number of gpus to configure the local scheduler with.
@@ -619,5 +662,6 @@ def start_ray_head(address_info=None,
redirect_output=redirect_output,
include_global_scheduler=True,
include_redis=True,
start_workers_from_local_scheduler=start_workers_from_local_scheduler,
num_cpus=num_cpus,
num_gpus=num_gpus)
+15 -4
View File
@@ -735,9 +735,15 @@ def get_address_info_from_redis(redis_address, node_ip_address, num_retries=5):
time.sleep(1)
counter += 1
def _init(address_info=None, start_ray_local=False, object_id_seed=None,
num_workers=None, num_local_schedulers=None,
driver_mode=SCRIPT_MODE, num_cpus=None, num_gpus=None):
def _init(address_info=None,
start_ray_local=False,
object_id_seed=None,
num_workers=None,
num_local_schedulers=None,
driver_mode=SCRIPT_MODE,
start_workers_from_local_scheduler=True,
num_cpus=None,
num_gpus=None):
"""Helper method to connect to an existing Ray cluster or start a new one.
This method handles two cases. Either a Ray cluster already exists and we
@@ -764,6 +770,9 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
only provided if start_ray_local is True.
driver_mode (bool): The mode in which to start the driver. This should be
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.
start_workers_from_local_scheduler (bool): If this flag is True, then start
the initial workers from the local scheduler. Else, start them from
Python. The latter case is for debugging purposes only.
num_cpus: A list containing the number of CPUs the local schedulers should
be configured with.
num_gpus: A list containing the number of GPUs the local schedulers should
@@ -815,7 +824,9 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
node_ip_address=node_ip_address,
num_workers=num_workers,
num_local_schedulers=num_local_schedulers,
num_cpus=num_cpus, num_gpus=num_gpus)
start_workers_from_local_scheduler=start_workers_from_local_scheduler,
num_cpus=num_cpus,
num_gpus=num_gpus)
else:
if redis_address is None:
raise Exception("If start_ray_local=False, then redis_address must be provided.")
+13 -11
View File
@@ -27,20 +27,14 @@ enum photon_message_type {
RECONSTRUCT_OBJECT,
/** Log a message to the event table. */
EVENT_LOG_MESSAGE,
/** Register a worker's process ID with the local scheduler. */
REGISTER_PID,
};
// clang-format off
/** Contains all information that is associated to a worker. */
typedef struct {
int sock;
/** A pointer to a task object, to update the task table. */
task *task_in_progress;
} worker;
// clang-format on
/* These are needed to define the UT_arrays. */
UT_icd task_ptr_icd;
UT_icd worker_icd;
UT_icd workers_icd;
UT_icd pid_t_icd;
/** Internal state of the scheduling algorithm. */
typedef struct scheduling_algorithm_state scheduling_algorithm_state;
@@ -50,7 +44,7 @@ typedef struct scheduling_algorithm_state scheduling_algorithm_state;
* scheduler. */
typedef struct {
/** The script to use when starting a new worker. */
char *start_worker_command;
const char **start_worker_command;
/** Whether there is a global scheduler. */
bool global_scheduler_exists;
} local_scheduler_config;
@@ -65,6 +59,9 @@ typedef struct {
* structs when we free the scheduler state and also to access the worker
* structs in the tests. */
UT_array *workers;
/** List of the process IDs for child processes (workers) started by the
* local scheduler that have not sent a REGISTER_PID message yet. */
UT_array *child_pids;
/** The handle to the database. */
db_handle *db;
/** The Plasma client. */
@@ -90,6 +87,11 @@ typedef struct {
* no task is running on the worker, this will be NULL. This is used to
* update the task table. */
task *task_in_progress;
/** The process ID of the client. If this is set to zero, the client has not
* yet registered a process ID. */
pid_t pid;
/** Whether the client is a child process of the local scheduler. */
bool is_child;
/** A pointer to the local scheduler state. */
local_scheduler_state *local_scheduler_state;
} local_scheduler_client;
+5
View File
@@ -7,6 +7,11 @@
photon_conn *photon_connect(const char *photon_socket) {
photon_conn *result = malloc(sizeof(photon_conn));
result->conn = connect_ipc_sock(photon_socket);
/* If this is a worker, register the process ID with the local scheduler. */
pid_t my_pid = getpid();
int success = write_message(result->conn, REGISTER_PID, sizeof(my_pid),
(uint8_t *) &my_pid);
CHECKM(success == 0, "Unable to register worker with local scheduler");
return result;
}
+1
View File
@@ -20,6 +20,7 @@ static int PyPhotonClient_init(PyPhotonClient *self,
if (!PyArg_ParseTuple(args, "s", &socket_name)) {
return -1;
}
/* Connect to the Photon scheduler. */
self->photon_connection = photon_connect(socket_name);
return 0;
}
+258 -76
View File
@@ -4,6 +4,7 @@
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <unistd.h>
#include "common.h"
@@ -23,6 +24,8 @@
UT_icd task_ptr_icd = {sizeof(task *), NULL, NULL, NULL};
UT_icd workers_icd = {sizeof(local_scheduler_client *), NULL, NULL, NULL};
UT_icd pid_t_icd = {sizeof(pid_t), NULL, NULL, NULL};
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
/**
@@ -53,6 +56,184 @@ void print_resource_info(const local_scheduler_state *state,
#endif
}
/**
* Kill a worker, if it is a child process, and clean up all of its associated
* state.
*
* @param worker A pointer to the worker we want to kill.
* @param wait A bool representing whether we should wait for the worker's
* process to exit. If the worker is not a child process, this flag is
* ignored.
* @return Void.
*/
void kill_worker(local_scheduler_client *worker, bool wait) {
/* Erase the worker from the array of workers. */
local_scheduler_state *state = worker->local_scheduler_state;
int num_workers = utarray_len(state->workers);
for (int i = 0; i < utarray_len(state->workers); ++i) {
local_scheduler_client *active_worker =
*(local_scheduler_client **) utarray_eltptr(state->workers, i);
if (active_worker == worker) {
utarray_erase(state->workers, i, 1);
}
}
/* Make sure that we erased exactly 1 worker. */
CHECKM(!(utarray_len(state->workers) < num_workers - 1),
"Found duplicate workers");
CHECKM(utarray_len(state->workers) != num_workers,
"Tried to kill worker that doesn't exist");
/* Remove the client socket from the event loop so that we don't process the
* SIGPIPE when the worker is killed. */
event_loop_remove_file(worker->local_scheduler_state->loop, worker->sock);
/* If the worker has registered a process ID with us and it's a child
* process, use it to send a kill signal. */
if (worker->is_child && worker->pid != 0) {
kill(worker->pid, SIGKILL);
if (wait) {
/* Wait for the process to exit. */
waitpid(worker->pid, NULL, 0);
}
}
/* Clean up the client socket after killing the worker so that the worker
* can't receive the SIGPIPE before exiting. */
close(worker->sock);
/* Clean up the task in progress. */
if (worker->task_in_progress) {
/* TODO(swang): Update the task table to mark the task as lost. */
free_task(worker->task_in_progress);
}
LOG_DEBUG("Killed worker with pid %d", worker->pid);
free(worker);
}
void free_local_scheduler(local_scheduler_state *state) {
/* Free the command for starting new workers. */
if (state->config.start_worker_command != NULL) {
int i = 0;
const char *arg = state->config.start_worker_command[i];
while (arg != NULL) {
free((void *) arg);
++i;
arg = state->config.start_worker_command[i];
}
free(state->config.start_worker_command);
state->config.start_worker_command = NULL;
}
/* Disconnect from the database. */
if (state->db != NULL) {
db_disconnect(state->db);
state->db = NULL;
}
/* Disconnect from plasma. */
plasma_disconnect(state->plasma_conn);
state->plasma_conn = NULL;
/* Kill any child processes that didn't register as a worker yet. */
pid_t *worker_pid;
for (worker_pid = (pid_t *) utarray_front(state->child_pids);
worker_pid != NULL;
worker_pid = (pid_t *) utarray_next(state->child_pids, worker_pid)) {
kill(*worker_pid, SIGKILL);
waitpid(*worker_pid, NULL, 0);
LOG_DEBUG("Killed pid %d", *worker_pid);
}
utarray_free(state->child_pids);
/* Free the list of workers and any tasks that are still in progress on those
* workers. */
for (local_scheduler_client **worker =
(local_scheduler_client **) utarray_front(state->workers);
worker != NULL;
worker = (local_scheduler_client **) utarray_front(state->workers)) {
kill_worker(*worker, true);
}
utarray_free(state->workers);
state->workers = NULL;
/* Free the algorithm state. */
free_scheduling_algorithm_state(state->algorithm_state);
state->algorithm_state = NULL;
/* Free the input buffer. */
utarray_free(state->input_buffer);
state->input_buffer = NULL;
/* Destroy the event loop. */
event_loop_destroy(state->loop);
state->loop = NULL;
/* Free the scheduler state. */
free(state);
}
/**
* Start a new worker as a child process.
*
* @param state The state of the local scheduler.
* @return Void.
*/
void start_worker(local_scheduler_state *state) {
/* We can't start a worker if we don't have the path to the worker script. */
CHECK(state->config.start_worker_command != NULL);
/* Launch the process to create the worker. */
pid_t pid = fork();
if (pid != 0) {
utarray_push_back(state->child_pids, &pid);
LOG_DEBUG("Started worker with pid %d", pid);
return;
}
/* Try to execute the worker command. Exit if we're not successful. */
execvp(state->config.start_worker_command[0],
(char *const *) state->config.start_worker_command);
free_local_scheduler(state);
LOG_FATAL("Failed to start worker");
}
/**
* Parse the command to start a worker. This takes in the command string,
* splits it into tokens on the space characters, and allocates an array of the
* tokens, terminated by a NULL pointer.
*
* @param command The command string to start a worker.
* @return A pointer to an array of strings, the tokens in the command string.
* The last element is a NULL pointer.
*/
const char **parse_command(const char *command) {
/* Count the number of tokens. */
char *command_copy = strdup(command);
const char *delimiter = " ";
char *token = NULL;
int num_args = 0;
token = strtok(command_copy, delimiter);
while (token != NULL) {
++num_args;
token = strtok(NULL, delimiter);
}
free(command_copy);
/* Allocate a NULL-terminated array for the tokens. */
const char **command_args = malloc((num_args + 1) * sizeof(const char *));
command_args[num_args] = NULL;
/* Fill in the token array. */
command_copy = strdup(command);
token = strtok(command_copy, delimiter);
int i = 0;
while (token != NULL) {
command_args[i] = strdup(token);
++i;
token = strtok(NULL, delimiter);
}
free(command_copy);
CHECK(num_args == i);
return command_args;
}
local_scheduler_state *init_local_scheduler(
const char *node_ip_address,
event_loop *loop,
@@ -63,12 +244,13 @@ local_scheduler_state *init_local_scheduler(
const char *plasma_manager_socket_name,
const char *plasma_manager_address,
bool global_scheduler_exists,
const double static_resource_conf[],
const char *start_worker_command,
const double static_resource_conf[]) {
int num_workers) {
local_scheduler_state *state = malloc(sizeof(local_scheduler_state));
/* Set the configuration struct for the local scheduler. */
if (start_worker_command != NULL) {
state->config.start_worker_command = strdup(start_worker_command);
state->config.start_worker_command = parse_command(start_worker_command);
} else {
state->config.start_worker_command = NULL;
}
@@ -123,53 +305,16 @@ local_scheduler_state *init_local_scheduler(
}
/* Print some debug information about resource configuration. */
print_resource_info(state, NULL);
/* Start the initial set of workers. */
utarray_new(state->child_pids, &pid_t_icd);
for (int i = 0; i < num_workers; ++i) {
start_worker(state);
}
return state;
};
void free_local_scheduler(local_scheduler_state *state) {
/* Free the command for starting new workers. */
if (state->config.start_worker_command != NULL) {
free(state->config.start_worker_command);
state->config.start_worker_command = NULL;
}
/* Disconnect from the database. */
if (state->db != NULL) {
db_disconnect(state->db);
state->db = NULL;
}
/* Disconnect from plasma. */
plasma_disconnect(state->plasma_conn);
state->plasma_conn = NULL;
/* Free the list of workers and any tasks that are still in progress on those
* workers. */
for (int i = 0; i < utarray_len(state->workers); ++i) {
local_scheduler_client **worker =
(local_scheduler_client **) utarray_eltptr(state->workers, i);
if ((*worker)->task_in_progress != NULL) {
free_task((*worker)->task_in_progress);
(*worker)->task_in_progress = NULL;
}
free(*worker);
*worker = NULL;
}
utarray_free(state->workers);
state->workers = NULL;
/* Free the algorithm state. */
free_scheduling_algorithm_state(state->algorithm_state);
state->algorithm_state = NULL;
/* Free the input buffer. */
utarray_free(state->input_buffer);
state->input_buffer = NULL;
/* Destroy the event loop. */
event_loop_destroy(state->loop);
state->loop = NULL;
/* Free the scheduler state. */
free(state);
}
void assign_task_to_worker(local_scheduler_state *state,
task_spec *spec,
local_scheduler_client *worker) {
@@ -384,10 +529,33 @@ void process_message(event_loop *loop,
} break;
case DISCONNECT_CLIENT: {
LOG_INFO("Disconnecting client on fd %d", client_sock);
event_loop_remove_file(loop, client_sock);
kill_worker(worker, false);
} break;
case LOG_MESSAGE: {
} break;
case REGISTER_PID: {
pid_t *worker_pid = (pid_t *) utarray_front(state->input_buffer);
worker->pid = *worker_pid;
/* Determine if this worker is one of our child processes. */
LOG_DEBUG("Pid is %d", *worker_pid);
pid_t *child_pid;
int index = 0;
for (child_pid = (pid_t *) utarray_front(state->child_pids);
child_pid != NULL;
child_pid = (pid_t *) utarray_next(state->child_pids, child_pid)) {
if (*child_pid == *worker_pid) {
/* If this worker is one of our child processes, mark it as a child so
* that we know that we can wait for the process to exit during
* cleanup. */
worker->is_child = true;
utarray_erase(state->child_pids, index, 1);
LOG_DEBUG("Found matching child pid %d", *worker_pid);
break;
}
++index;
}
} break;
default:
/* This code should be unreachable. */
CHECK(0);
@@ -405,6 +573,8 @@ void new_client_connection(event_loop *loop,
local_scheduler_client *worker = malloc(sizeof(local_scheduler_client));
worker->sock = new_socket;
worker->task_in_progress = NULL;
worker->pid = 0;
worker->is_child = false;
worker->local_scheduler_state = state;
utarray_push_back(state->workers, &worker);
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
@@ -417,6 +587,7 @@ void new_client_connection(event_loop *loop,
local_scheduler_state *g_state;
void signal_handler(int signal) {
LOG_DEBUG("Signal was %d", signal);
if (signal == SIGTERM) {
free_local_scheduler(g_state);
exit(0);
@@ -442,14 +613,6 @@ int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
return LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS;
}
void start_new_worker(local_scheduler_state *state) {
/* We can't start a worker if we don't have the path to the worker script. */
CHECK(state->config.start_worker_command != NULL);
/* Launch the process to create the worker. */
FILE *p = popen(state->config.start_worker_command, "r");
UNUSED(p);
}
void start_server(const char *node_ip_address,
const char *socket_name,
const char *redis_addr,
@@ -458,8 +621,9 @@ void start_server(const char *node_ip_address,
const char *plasma_manager_socket_name,
const char *plasma_manager_address,
bool global_scheduler_exists,
const double static_resource_conf[],
const char *start_worker_command,
const double static_resource_conf[]) {
int num_workers) {
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
* to a client that has already died, the local scheduler could die. */
signal(SIGPIPE, SIG_IGN);
@@ -468,8 +632,8 @@ void start_server(const char *node_ip_address,
g_state = init_local_scheduler(
node_ip_address, loop, redis_addr, redis_port, socket_name,
plasma_store_socket_name, plasma_manager_socket_name,
plasma_manager_address, global_scheduler_exists, start_worker_command,
static_resource_conf);
plasma_manager_address, global_scheduler_exists, static_resource_conf,
start_worker_command, num_workers);
/* Register a callback for registering new clients. */
event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection,
g_state);
@@ -518,14 +682,16 @@ int main(int argc, char *argv[]) {
char *plasma_manager_address = NULL;
/* The IP address of the node that this local scheduler is running on. */
char *node_ip_address = NULL;
/* The command to run when starting new workers. */
char *start_worker_command = NULL;
/* Comma-separated list of configured resource capabilities for this node. */
char *static_resource_list = NULL;
double static_resource_conf[MAX_RESOURCE_INDEX];
/* The command to run when starting new workers. */
char *start_worker_command = NULL;
/* The number of workers to start. */
char *num_workers_str = NULL;
int c;
bool global_scheduler_exists = true;
while ((c = getopt(argc, argv, "s:r:p:m:ga:h:w:c:")) != -1) {
while ((c = getopt(argc, argv, "s:r:p:m:ga:h:c:w:n:")) != -1) {
switch (c) {
case 's':
scheduler_socket_name = optarg;
@@ -548,11 +714,14 @@ int main(int argc, char *argv[]) {
case 'h':
node_ip_address = optarg;
break;
case 'c':
static_resource_list = optarg;
break;
case 'w':
start_worker_command = optarg;
break;
case 'c':
static_resource_list = optarg;
case 'n':
num_workers_str = optarg;
break;
default:
LOG_FATAL("unknown option %c", c);
@@ -585,6 +754,16 @@ int main(int argc, char *argv[]) {
if (!node_ip_address) {
LOG_FATAL("please specify the node IP address with -h switch");
}
int num_workers = 0;
if (num_workers_str) {
num_workers = strtol(num_workers_str, NULL, 10);
if (num_workers < 0) {
LOG_FATAL("Number of workers must be nonnegative");
}
}
char *redis_addr = NULL;
int redis_port = -1;
if (!redis_addr_port) {
/* Start the local scheduler without connecting to Redis. In this case, all
* submitted tasks will be queued and scheduled locally. */
@@ -593,31 +772,34 @@ int main(int argc, char *argv[]) {
"if a plasma manager socket name is provided with the -m switch, "
"then a redis address must be provided with the -r switch");
}
start_server(node_ip_address, scheduler_socket_name, NULL, -1,
plasma_store_socket_name, NULL, plasma_manager_address,
global_scheduler_exists, start_worker_command,
static_resource_conf);
} else {
char redis_addr_buffer[16] = {0};
char redis_port_str[6] = {0};
/* Parse the Redis address into an IP address and a port. */
char redis_addr[16] = {0};
char redis_port[6] = {0};
int num_assigned =
sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]", redis_addr, redis_port);
int num_assigned = sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]",
redis_addr_buffer, redis_port_str);
if (num_assigned != 2) {
LOG_FATAL(
"if a redis address is provided with the -r switch, it should be "
"formatted like 127.0.0.1:6379");
}
redis_addr = redis_addr_buffer;
redis_port = strtol(redis_port_str, NULL, 10);
if (redis_port == 0) {
LOG_FATAL("Unable to parse port number from redis address %s",
redis_addr_port);
}
if (!plasma_manager_socket_name) {
LOG_FATAL(
"please specify socket for connecting to Plasma manager with -m "
"switch");
}
start_server(node_ip_address, scheduler_socket_name, &redis_addr[0],
atoi(redis_port), plasma_store_socket_name,
plasma_manager_socket_name, plasma_manager_address,
global_scheduler_exists, start_worker_command,
static_resource_conf);
}
LOG_INFO("Start worker command is %s", start_worker_command);
start_server(node_ip_address, scheduler_socket_name, redis_addr, redis_port,
plasma_store_socket_name, plasma_manager_socket_name,
plasma_manager_address, global_scheduler_exists,
static_resource_conf, start_worker_command, num_workers);
}
#endif
+6 -1
View File
@@ -79,8 +79,9 @@ local_scheduler_state *init_local_scheduler(
const char *plasma_store_socket_name,
const char *plasma_manager_address,
bool global_scheduler_exists,
const double static_resource_vector[],
const char *worker_path,
const double static_resource_vector[]);
int num_workers);
void free_local_scheduler(local_scheduler_state *state);
@@ -90,6 +91,10 @@ void process_message(event_loop *loop,
int client_sock,
void *context,
int events);
void kill_worker(local_scheduler_client *worker, bool wait);
void start_worker(local_scheduler_state *state);
#endif
#endif /* PHOTON_SCHEDULER_H */
+142 -38
View File
@@ -33,25 +33,33 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) {
}
typedef struct {
/** A socket to mock the Plasma store. */
int plasma_fd;
/** A socket to mock the Plasma manager. Clients (such as workers) that
* connect to this file descriptor must be accepted. */
int plasma_manager_fd;
/** A socket to communicate with the Plasma store. */
int plasma_store_fd;
/** Photon's socket for IPC requests. */
int photon_fd;
/** Photon's local scheduler state. */
local_scheduler_state *photon_state;
/** Photon's event loop. */
event_loop *loop;
/** A Photon client connection. */
photon_conn *conn;
/** Number of Photon client connections, or mock workers. */
int num_photon_conns;
/** Photon client connections. */
photon_conn **conns;
} photon_mock;
photon_mock *init_photon_mock(bool connect_to_redis) {
photon_mock *init_photon_mock(bool connect_to_redis,
int num_workers,
int num_mock_workers) {
const char *node_ip_address = "127.0.0.1";
const char *redis_addr = NULL;
int redis_port = -1;
const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS,
DEFAULT_NUM_GPUS};
if (connect_to_redis) {
redis_addr = "127.0.0.1";
redis_addr = node_ip_address;
redis_port = 6379;
}
@@ -59,34 +67,60 @@ photon_mock *init_photon_mock(bool connect_to_redis) {
memset(mock, 0, sizeof(photon_mock));
mock->loop = event_loop_create();
/* Bind to the Photon port and initialize the Photon scheduler. */
/* TODO(rkn): Why are we reusing mock->plasma_fd for both the store and the
* manager? */
UT_string *plasma_manager_socket_name =
bind_ipc_sock_retry(plasma_manager_socket_name_format, &mock->plasma_fd);
mock->plasma_fd = socket_connect_retry(plasma_store_socket_name, 5, 100);
UT_string *plasma_manager_socket_name = bind_ipc_sock_retry(
plasma_manager_socket_name_format, &mock->plasma_manager_fd);
mock->plasma_store_fd =
socket_connect_retry(plasma_store_socket_name, 5, 100);
UT_string *photon_socket_name =
bind_ipc_sock_retry(photon_socket_name_format, &mock->photon_fd);
CHECK(mock->plasma_fd >= 0 && mock->photon_fd >= 0);
CHECK(mock->plasma_store_fd >= 0 && mock->photon_fd >= 0);
UT_string *worker_command;
utstring_new(worker_command);
utstring_printf(worker_command,
"python ../../python/ray/workers/default_worker.py "
"--node-ip-address=%s --object-store-name=%s "
"--object-store-manager-name=%s --local-scheduler-name=%s "
"--redis-address=%s:%d",
node_ip_address, plasma_store_socket_name,
utstring_body(plasma_manager_socket_name),
utstring_body(photon_socket_name), redis_addr, redis_port);
mock->photon_state = init_local_scheduler(
"127.0.0.1", mock->loop, redis_addr, redis_port,
utstring_body(photon_socket_name), plasma_store_socket_name,
utstring_body(plasma_manager_socket_name), NULL, false, NULL,
static_resource_conf);
utstring_body(plasma_manager_socket_name), NULL, false,
static_resource_conf, utstring_body(worker_command), num_workers);
/* Accept the workers as clients to the plasma manager. */
for (int i = 0; i < num_workers; ++i) {
accept_client(mock->plasma_manager_fd);
}
/* Connect a Photon client. */
mock->conn = photon_connect(utstring_body(photon_socket_name));
new_client_connection(mock->loop, mock->photon_fd,
(void *) mock->photon_state, 0);
mock->num_photon_conns = num_mock_workers;
mock->conns = malloc(sizeof(photon_conn *) * num_mock_workers);
for (int i = 0; i < num_mock_workers; ++i) {
mock->conns[i] = photon_connect(utstring_body(photon_socket_name));
new_client_connection(mock->loop, mock->photon_fd,
(void *) mock->photon_state, 0);
}
utstring_free(worker_command);
utstring_free(plasma_manager_socket_name);
utstring_free(photon_socket_name);
return mock;
}
void destroy_photon_mock(photon_mock *mock) {
photon_disconnect(mock->conn);
close(mock->photon_fd);
close(mock->plasma_fd);
for (int i = 0; i < mock->num_photon_conns; ++i) {
photon_disconnect(mock->conns[i]);
}
free(mock->conns);
/* This also frees mock->loop. */
free_local_scheduler(mock->photon_state);
close(mock->plasma_store_fd);
close(mock->plasma_manager_fd);
free(mock);
}
@@ -103,7 +137,9 @@ void reset_worker(photon_mock *mock, local_scheduler_client *worker) {
* value, the task should get assigned to a worker again.
*/
TEST object_reconstruction_test(void) {
photon_mock *photon = init_photon_mock(true);
photon_mock *photon = init_photon_mock(true, 0, 1);
photon_conn *worker = photon->conns[0];
/* Create a task with zero dependencies and one return value. */
task_spec *spec = example_task_spec(0, 1);
object_id return_id = task_return(spec, 0);
@@ -125,10 +161,10 @@ TEST object_reconstruction_test(void) {
if (pid == 0) {
/* Make sure we receive the task twice. First from the initial submission,
* and second from the reconstruct request. */
photon_submit(photon->conn, spec);
task_spec *task_assigned = photon_get_task(photon->conn);
photon_submit(worker, spec);
task_spec *task_assigned = photon_get_task(worker);
ASSERT_EQ(memcmp(task_assigned, spec, task_spec_size(spec)), 0);
task_spec *reconstruct_task = photon_get_task(photon->conn);
task_spec *reconstruct_task = photon_get_task(worker);
ASSERT_EQ(memcmp(reconstruct_task, spec, task_spec_size(spec)), 0);
/* Clean up. */
free_task_spec(reconstruct_task);
@@ -150,7 +186,7 @@ TEST object_reconstruction_test(void) {
(retry_info *) &photon_retry, NULL, NULL);
/* Trigger reconstruction, and run the event loop again. */
object_id return_id = task_return(spec, 0);
photon_reconstruct_object(photon->conn, return_id);
photon_reconstruct_object(worker, return_id);
event_loop_add_timer(photon->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(photon->loop);
@@ -171,7 +207,8 @@ TEST object_reconstruction_test(void) {
* should trigger reconstruction of all previous tasks in the lineage.
*/
TEST object_reconstruction_recursive_test(void) {
photon_mock *photon = init_photon_mock(true);
photon_mock *photon = init_photon_mock(true, 0, 1);
photon_conn *worker = photon->conns[0];
/* Create a chain of tasks, each one dependent on the one before it. Mark
* each object as available so that tasks will run immediately. */
const int NUM_TASKS = 10;
@@ -204,11 +241,11 @@ TEST object_reconstruction_recursive_test(void) {
if (pid == 0) {
/* Submit the tasks, and make sure each one gets assigned to a worker. */
for (int i = 0; i < NUM_TASKS; ++i) {
photon_submit(photon->conn, specs[i]);
photon_submit(worker, specs[i]);
}
/* Make sure we receive each task from the initial submission. */
for (int i = 0; i < NUM_TASKS; ++i) {
task_spec *task_assigned = photon_get_task(photon->conn);
task_spec *task_assigned = photon_get_task(worker);
ASSERT_EQ(memcmp(task_assigned, specs[i], task_spec_size(task_assigned)),
0);
free_task_spec(task_assigned);
@@ -216,7 +253,7 @@ TEST object_reconstruction_recursive_test(void) {
/* Check that the workers receive all tasks in the final return object's
* lineage during reconstruction. */
for (int i = 0; i < NUM_TASKS; ++i) {
task_spec *task_assigned = photon_get_task(photon->conn);
task_spec *task_assigned = photon_get_task(worker);
bool found = false;
for (int j = 0; j < NUM_TASKS; ++j) {
if (specs[j] == NULL) {
@@ -249,7 +286,7 @@ TEST object_reconstruction_recursive_test(void) {
/* Trigger reconstruction for the last object, and run the event loop
* again. */
object_id return_id = task_return(specs[NUM_TASKS - 1], 0);
photon_reconstruct_object(photon->conn, return_id);
photon_reconstruct_object(worker, return_id);
event_loop_add_timer(photon->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(photon->loop);
@@ -275,25 +312,27 @@ task_spec *object_reconstruction_suppression_spec;
void object_reconstruction_suppression_callback(object_id object_id,
void *user_context) {
/* Submit the task after adding the object to the object table. */
photon_mock *photon = user_context;
photon_submit(photon->conn, object_reconstruction_suppression_spec);
photon_conn *worker = user_context;
photon_submit(worker, object_reconstruction_suppression_spec);
}
TEST object_reconstruction_suppression_test(void) {
photon_mock *photon = init_photon_mock(true);
photon_mock *photon = init_photon_mock(true, 0, 1);
photon_conn *worker = photon->conns[0];
object_reconstruction_suppression_spec = example_task_spec(0, 1);
object_id return_id = task_return(object_reconstruction_suppression_spec, 0);
pid_t pid = fork();
if (pid == 0) {
/* Make sure we receive the task once. This will block until the
* object_table_add callback completes. */
task_spec *task_assigned = photon_get_task(photon->conn);
task_spec *task_assigned = photon_get_task(worker);
ASSERT_EQ(memcmp(task_assigned, object_reconstruction_suppression_spec,
task_spec_size(object_reconstruction_suppression_spec)),
0);
/* Trigger a reconstruction. We will check that no tasks get queued as a
* result of this line in the event loop process. */
photon_reconstruct_object(photon->conn, return_id);
photon_reconstruct_object(worker, return_id);
/* Clean up. */
free_task_spec(task_assigned);
free_task_spec(object_reconstruction_suppression_spec);
@@ -309,7 +348,7 @@ TEST object_reconstruction_suppression_test(void) {
object_table_add(db, return_id, 1, (unsigned char *) NIL_DIGEST,
(retry_info *) &photon_retry,
object_reconstruction_suppression_callback,
(void *) photon);
(void *) worker);
/* Run the event loop. NOTE: OSX appears to require the parent process to
* listen for events on the open file descriptors. */
event_loop_add_timer(photon->loop, 1000,
@@ -328,7 +367,7 @@ TEST object_reconstruction_suppression_test(void) {
}
TEST task_dependency_test(void) {
photon_mock *photon = init_photon_mock(false);
photon_mock *photon = init_photon_mock(false, 0, 1);
local_scheduler_state *state = photon->photon_state;
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
/* Get the first worker. */
@@ -403,7 +442,7 @@ TEST task_dependency_test(void) {
}
TEST task_multi_dependency_test(void) {
photon_mock *photon = init_photon_mock(false);
photon_mock *photon = init_photon_mock(false, 0, 1);
local_scheduler_state *state = photon->photon_state;
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
/* Get the first worker. */
@@ -476,12 +515,77 @@ TEST task_multi_dependency_test(void) {
PASS();
}
TEST start_kill_workers_test(void) {
/* Start some workers. */
int num_workers = 4;
photon_mock *photon = init_photon_mock(true, num_workers, 0);
/* We start off with num_workers children processes, but no workers
* registered yet. */
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers);
ASSERT_EQ(utarray_len(photon->photon_state->workers), 0);
/* Make sure that each worker connects to the photon scheduler. This for loop
* will hang if one of the workers does not connect. */
for (int i = 0; i < num_workers; ++i) {
new_client_connection(photon->loop, photon->photon_fd,
(void *) photon->photon_state, 0);
}
/* After handling each worker's initial connection, we should now have all
* workers accounted for, but we haven't yet matched up process IDs with our
* children processes. */
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers);
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers);
/* Each worker should register its process ID. */
for (int i = 0; i < utarray_len(photon->photon_state->workers); ++i) {
local_scheduler_client *worker =
*(local_scheduler_client **) utarray_eltptr(
photon->photon_state->workers, i);
process_message(photon->photon_state->loop, worker->sock, worker, 0);
}
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0);
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers);
/* After killing a worker, its state is cleaned up. */
local_scheduler_client *worker = *(local_scheduler_client **) utarray_eltptr(
photon->photon_state->workers, 0);
kill_worker(worker, true);
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0);
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1);
/* Start a worker after the local scheduler has been initialized. */
start_worker(photon->photon_state);
/* Accept the workers as clients to the plasma manager. */
int new_worker_fd = accept_client(photon->plasma_manager_fd);
/* The new worker should register its process ID. */
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1);
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1);
/* Make sure the new worker connects to the photon scheduler. */
new_client_connection(photon->loop, photon->photon_fd,
(void *) photon->photon_state, 0);
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 1);
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers);
/* Make sure that the new worker registers its process ID. */
worker = *(local_scheduler_client **) utarray_eltptr(
photon->photon_state->workers, num_workers - 1);
process_message(photon->photon_state->loop, worker->sock, worker, 0);
ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0);
ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers);
/* Clean up. */
close(new_worker_fd);
destroy_photon_mock(photon);
PASS();
}
SUITE(photon_tests) {
RUN_REDIS_TEST(object_reconstruction_test);
RUN_REDIS_TEST(object_reconstruction_recursive_test);
RUN_REDIS_TEST(object_reconstruction_suppression_test);
RUN_TEST(task_dependency_test);
RUN_TEST(task_multi_dependency_test);
RUN_TEST(start_kill_workers_test);
}
GREATEST_MAIN_DEFS();
+8 -2
View File
@@ -16,7 +16,10 @@ class ComponentFailureTest(unittest.TestCase):
def f():
ray.worker.global_worker.plasma_client.get(obj_id)
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
ray.worker._init(num_workers=1,
driver_mode=ray.SILENT_MODE,
start_workers_from_local_scheduler=False,
start_ray_local=True)
# Have the worker wait in a get call.
f.remote()
@@ -44,7 +47,10 @@ class ComponentFailureTest(unittest.TestCase):
def f():
ray.worker.global_worker.plasma_client.wait([obj_id])
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)
ray.worker._init(num_workers=1,
driver_mode=ray.SILENT_MODE,
start_workers_from_local_scheduler=False,
start_ray_local=True)
# Have the worker wait in a get call.
f.remote()
+16
View File
@@ -183,6 +183,22 @@ class SerializationTest(unittest.TestCase):
class WorkerTest(unittest.TestCase):
def testPythonWorkers(self):
# Test the codepath for starting workers from the Python script, instead of
# the local scheduler. This codepath is for debugging purposes only.
num_workers = 4
ray.worker._init(num_workers=num_workers,
start_workers_from_local_scheduler=False,
start_ray_local=True)
@ray.remote
def f(x):
return x
values = ray.get([f.remote(1) for i in range(num_workers * 2)])
self.assertEqual(values, [1] * (num_workers * 2))
ray.worker.cleanup()
def testPutGet(self):
ray.init(num_workers=0)