From be1618f041f9d02019c1b2ee09047291596ec2a0 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 25 Feb 2017 20:19:36 -0800 Subject: [PATCH] Availability after worker failure (#316) * Availability after a killed worker * Workers exit cleanly * Memory cleanup in photon C tests * Worker failure in multinode * Consolidate worker cleanup handlers * Update the result table before handling a task submission * KILL_WORKER_TIMEOUT -> KILL_WORKER_TIMEOUT_MILLISECONDS * Log a warning instead of crashing if no result table entry found --- python/common/redis_module/runtest.py | 33 ++++- python/ray/worker.py | 31 +++-- src/common/redis_module/ray_redis_module.c | 21 +-- src/common/state/redis.c | 4 +- src/common/state/task_table.c | 4 +- src/common/state/task_table.h | 11 +- src/common/task.h | 4 +- src/photon/photon_algorithm.c | 70 ++++++---- src/photon/photon_algorithm.h | 12 ++ src/photon/photon_extension.c | 2 +- src/photon/photon_scheduler.c | 148 +++++++++++++++------ src/photon/photon_scheduler.h | 4 + src/photon/test/photon_tests.c | 48 ++++--- test/component_failures_test.py | 33 +++++ 14 files changed, 307 insertions(+), 118 deletions(-) diff --git a/python/common/redis_module/runtest.py b/python/common/redis_module/runtest.py index 604810eb6..690d93fb9 100644 --- a/python/common/redis_module/runtest.py +++ b/python/common/redis_module/runtest.py @@ -216,14 +216,18 @@ class TestGlobalStateStore(unittest.TestCase): "node_id") def testTaskTableAddAndLookup(self): + TASK_STATUS_WAITING = 1 + TASK_STATUS_SCHEDULED = 2 + TASK_STATUS_QUEUED = 4 + # Check that task table adds, updates, and lookups work correctly. - task_args = [1, b"node_id", b"task_spec"] + task_args = [TASK_STATUS_WAITING, b"node_id", b"task_spec"] response = self.redis.execute_command("RAY.TASK_TABLE_ADD", "task_id", *task_args) response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id") self.assertEqual(response, task_args) - task_args[0] = 2 + task_args[0] = TASK_STATUS_SCHEDULED self.redis.execute_command("RAY.TASK_TABLE_UPDATE", "task_id", *task_args[:2]) response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id") self.assertEqual(response, task_args) @@ -241,7 +245,7 @@ class TestGlobalStateStore(unittest.TestCase): # If the current value is the same as the test value, and the set value is # different, the update happens, and the response is the entire task. - task_args[1] += 1 + task_args[1] = TASK_STATUS_QUEUED response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE", "task_id", *task_args[:3]) @@ -252,7 +256,7 @@ class TestGlobalStateStore(unittest.TestCase): # If the current value is no longer the same as the test value, the # response is nil. - task_args[1] += 1 + task_args[1] = TASK_STATUS_WAITING response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE", "task_id", *task_args[:3]) @@ -262,6 +266,27 @@ class TestGlobalStateStore(unittest.TestCase): self.assertEqual(get_response2, get_response) self.assertNotEqual(get_response2, task_args[1:]) + # If the test value is a bitmask that matches the current value, the update + # happens. + task_args[0] = TASK_STATUS_SCHEDULED | TASK_STATUS_QUEUED + response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE", + "task_id", + *task_args[:3]) + self.assertEqual(response, task_args[1:]) + + # If the test value is a bitmask that does not match the current value, the + # update does not happen. + task_args[1] = TASK_STATUS_SCHEDULED + old_response = response + response = self.redis.execute_command("RAY.TASK_TABLE_TEST_AND_UPDATE", + "task_id", + *task_args[:3]) + self.assertEqual(response, None) + # Check that the update did not happen. + get_response = self.redis.execute_command("RAY.TASK_TABLE_GET", "task_id") + self.assertEqual(get_response, old_response) + self.assertNotEqual(get_response, task_args[1:]) + def testTaskTableSubscribe(self): scheduling_state = 1 node_id = "node_id" diff --git a/python/ray/worker.py b/python/ray/worker.py index e1db62bf0..9823ba0cd 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -14,6 +14,7 @@ import numpy as np import os import random import redis +import signal import string import sys import threading @@ -936,23 +937,31 @@ def init(redis_address=None, node_ip_address=None, object_id_seed=None, num_gpus=num_gpus) def cleanup(worker=global_worker): - """Disconnect the driver, and terminate any processes started in init. + """Disconnect the worker, and terminate any processes started in init. This will automatically run at the end when a Python process that uses Ray exits. It is ok to run this twice in a row. Note that we manually call services.cleanup() in the tests because we need to start and stop many clusters in the tests, but the import and exit only happen once. """ - # If this is a driver, push the finish time to Redis. - if worker.mode in [SCRIPT_MODE, SILENT_MODE]: - worker.redis_client.hmset(b"Drivers:" + worker.worker_id, - {"end_time": time.time()}) - disconnect(worker) - worker.set_mode(None) + if hasattr(worker, "photon_client"): + del worker.photon_client if hasattr(worker, "plasma_client"): worker.plasma_client.shutdown() - services.cleanup() + + if worker.mode in [SCRIPT_MODE, SILENT_MODE]: + # If this is a driver, push the finish time to Redis and clean up any + # other services that were started with the driver. + worker.redis_client.hmset(b"Drivers:" + worker.worker_id, + {"end_time": time.time()}) + services.cleanup() + else: + # If this is not a driver, make sure there are no orphan processes. + for process_type, processes in services.all_processes.items(): + assert(len(processes) == 0) + + worker.set_mode(None) atexit.register(cleanup) @@ -1559,6 +1568,12 @@ def main_loop(worker=global_worker): that occurred while executing the command, and waits for the next command. """ + def exit(signum, frame): + cleanup(worker=worker) + sys.exit(0) + + signal.signal(signal.SIGTERM, exit) + def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts """Execute a task assigned to this worker. diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index 949918317..8e5d094f6 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -844,17 +844,18 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx, /** * Test and update an entry in the task table if the current value matches the - * test value. This does not update the task specification in the table. + * test value bitmask. This does not update the task specification in the + * table. * * This is called from a client with the command: * - * RAY.TASK_TABLE_TEST_AND_UPDATE + * RAY.TASK_TABLE_TEST_AND_UPDATE * * * @param task_id A string that is the ID of the task. - * @param test_state A string that is the test value for the scheduling state. - * The update happens if and only if the current scheduling state - * matches this value. + * @param test_state_bitmask A string that is the test bitmask for the + * scheduling state. The update happens if and only if the current + * scheduling state AND-ed with the bitmask is greater than 0. * @param state A string that is the scheduling state (a scheduling_state enum * instance) to update the task entry with. The string's value must be a * nonnegative integer less than 100, so that it has width at most 2. If @@ -862,7 +863,7 @@ int TaskTableUpdate_RedisCommand(RedisModuleCtx *ctx, * 2. * @param ray_client_id A string that is the ray client ID of the associated * local scheduler, if any, to update the task entry with. - * @return If the current scheduling state does not match the test value, + * @return If the current scheduling state does not match the test bitmask, * returns nil. Else, returns the same as RAY.TASK_TABLE_GET: an array * of strings representing the updated task fields in the following * order: 1) (integer) scheduling state 2) (string) associated node ID, @@ -903,16 +904,16 @@ int TaskTableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx, "Found invalid scheduling state (must " "be an integer of width 2"); } - long long test_state_integer; - int status = RedisModule_StringToLongLong(argv[2], &test_state_integer); + long long test_state_bitmask; + int status = RedisModule_StringToLongLong(argv[2], &test_state_bitmask); if (status != REDISMODULE_OK) { RedisModule_CloseKey(key); RedisModule_FreeString(ctx, state); return RedisModule_ReplyWithError( ctx, "Invalid test value for scheduling state"); } - if (current_state_integer != test_state_integer) { - /* The current value does not match the test value, so do not perform the + if ((current_state_integer & test_state_bitmask) == 0) { + /* The current value does not match the test bitmask, so do not perform the * update. */ RedisModule_CloseKey(key); RedisModule_FreeString(ctx, state); diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 19d56d489..90489409a 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -864,8 +864,8 @@ void redis_task_table_test_and_update(table_callback_data *callback_data) { db->context, redis_task_table_test_and_update_callback, (void *) callback_data->timer_id, "RAY.TASK_TABLE_TEST_AND_UPDATE %b %d %d %b", task_id.id, - sizeof(task_id.id), update_data->test_state, update_data->update_state, - update_data->local_scheduler_id.id, + sizeof(task_id.id), update_data->test_state_bitmask, + update_data->update_state, update_data->local_scheduler_id.id, sizeof(update_data->local_scheduler_id.id)); if ((status == REDIS_ERR) || db->context->err) { LOG_REDIS_DEBUG(db->context, "error in redis_task_table_test_and_update"); diff --git a/src/common/state/task_table.c b/src/common/state/task_table.c index 547921569..26d7e8199 100644 --- a/src/common/state/task_table.c +++ b/src/common/state/task_table.c @@ -32,14 +32,14 @@ void task_table_update(db_handle *db_handle, void task_table_test_and_update(db_handle *db_handle, task_id task_id, - int test_state, + int test_state_bitmask, int update_state, retry_info *retry, task_table_get_callback done_callback, void *user_context) { task_table_test_and_update_data *update_data = malloc(sizeof(task_table_test_and_update_data)); - update_data->test_state = test_state; + update_data->test_state_bitmask = test_state_bitmask; update_data->update_state = update_state; /* Update the task entry's local scheduler with this client's ID. */ update_data->local_scheduler_id = db_handle->client; diff --git a/src/common/state/task_table.h b/src/common/state/task_table.h index a715980f6..c61477980 100644 --- a/src/common/state/task_table.h +++ b/src/common/state/task_table.h @@ -96,10 +96,11 @@ void task_table_update(db_handle *db_handle, * * @param db_handle Database handle. * @param task_id The task ID of the task entry to update. - * @param test_state The value to test the current task entry's scheduling - * state against. + * @param test_state_bitmask The bitmask to apply to the task entry's current + * scheduling state. The update happens if and only if the current + * scheduling state AND-ed with the bitmask is greater than 0. * @param update_state The value to update the task entry's scheduling state - * with, if the current state matches test_state. + * with, if the current state matches test_state_bitmask. * @param retry Information about retrying the request to the database. * @param done_callback Function to be called when database returns result. * @param user_context Data that will be passed to done_callback and @@ -108,7 +109,7 @@ void task_table_update(db_handle *db_handle, */ void task_table_test_and_update(db_handle *db_handle, task_id task_id, - int test_state, + int test_state_bitmask, int update_state, retry_info *retry, task_table_get_callback done_callback, @@ -116,7 +117,7 @@ void task_table_test_and_update(db_handle *db_handle, /* Data that is needed to test and set the task's scheduling state. */ typedef struct { - int test_state; + int test_state_bitmask; int update_state; db_client_id local_scheduler_id; } task_table_test_and_update_data; diff --git a/src/common/task.h b/src/common/task.h index 012688089..03c921253 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -348,8 +348,10 @@ typedef enum { TASK_STATUS_RUNNING = 8, /** The task is done executing. */ TASK_STATUS_DONE = 16, + /** The task was not able to finish. */ + TASK_STATUS_LOST = 32, /** The task will be submitted for reexecution. */ - TASK_STATUS_RECONSTRUCTING = 32 + TASK_STATUS_RECONSTRUCTING = 64 } scheduling_state; /** A task is an execution of a task specification. It has a state of execution diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 0b2db5e1a..38fe961de 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -803,24 +803,6 @@ bool resource_constraints_satisfied(local_scheduler_state *state, return true; } -/** - * Update the result table, which holds mappings of object ID -> ID of the - * task that created it. - * - * @param state The scheduler state. - * @param spec The task spec in question. - * @return Void. - */ -void update_result_table(local_scheduler_state *state, task_spec *spec) { - if (state->db != NULL) { - task_id task_id = task_spec_id(spec); - for (int64_t i = 0; i < task_num_returns(spec); ++i) { - object_id return_id = task_return(spec, i); - result_table_add(state->db, return_id, task_id, NULL, NULL, NULL); - } - } -} - void handle_task_submitted(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, task_spec *spec) { @@ -843,10 +825,6 @@ void handle_task_submitted(local_scheduler_state *state, /* Try to dispatch tasks, since we may have added one to the queue. */ dispatch_tasks(state, algorithm_state); - - /* Update the result table, which holds mappings of object ID -> ID of the - * task that created it. */ - update_result_table(state, spec); } void handle_actor_task_submitted(local_scheduler_state *state, @@ -881,10 +859,6 @@ void handle_actor_task_submitted(local_scheduler_state *state, give_task_to_local_scheduler(state, algorithm_state, spec, entry->local_scheduler_id); } - - /* Update the result table, which holds mappings of object ID -> ID of the - * task that created it. */ - update_result_table(state, spec); } void handle_actor_creation_notification( @@ -994,6 +968,50 @@ void handle_worker_available(local_scheduler_state *state, dispatch_tasks(state, algorithm_state); } +void handle_worker_removed(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker) { + /* Make sure that we remove the worker at most once. */ + bool removed = false; + int64_t num_workers; + + /* Remove the worker from available workers, if it's there. */ + num_workers = utarray_len(algorithm_state->available_workers); + for (int64_t i = num_workers - 1; i >= 0; --i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->available_workers, i); + DCHECK(!((*p == worker) && removed)); + if (*p == worker) { + utarray_erase(algorithm_state->available_workers, i, 1); + removed = true; + } + } + + /* Remove the worker from executing workers, if it's there. */ + num_workers = utarray_len(algorithm_state->executing_workers); + for (int64_t i = num_workers - 1; i >= 0; --i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->executing_workers, i); + DCHECK(!((*p == worker) && removed)); + if (*p == worker) { + utarray_erase(algorithm_state->executing_workers, i, 1); + removed = true; + } + } + + /* Remove the worker from blocked workers, if it's there. */ + num_workers = utarray_len(algorithm_state->blocked_workers); + for (int64_t i = num_workers - 1; i >= 0; --i) { + local_scheduler_client **p = (local_scheduler_client **) utarray_eltptr( + algorithm_state->blocked_workers, i); + DCHECK(!((*p == worker) && removed)); + if (*p == worker) { + utarray_erase(algorithm_state->blocked_workers, i, 1); + removed = true; + } + } +} + void handle_actor_worker_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, local_scheduler_client *worker) { diff --git a/src/photon/photon_algorithm.h b/src/photon/photon_algorithm.h index d34f3c674..0f96895d3 100644 --- a/src/photon/photon_algorithm.h +++ b/src/photon/photon_algorithm.h @@ -150,6 +150,18 @@ void handle_worker_available(local_scheduler_state *state, scheduling_algorithm_state *algorithm_state, local_scheduler_client *worker); +/** + * This function is called when a worker is removed. + * + * @param state The state of the local scheduler. + * @param algorithm_state State maintained by the scheduling algorithm. + * @param worker The worker that is removed. + * @return Void. + */ +void handle_worker_removed(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + local_scheduler_client *worker); + /** * This version of handle_worker_available is called whenever the worker that is * available is running an actor. diff --git a/src/photon/photon_extension.c b/src/photon/photon_extension.c index 1970cd579..0658f07f4 100644 --- a/src/photon/photon_extension.c +++ b/src/photon/photon_extension.c @@ -28,7 +28,7 @@ static int PyPhotonClient_init(PyPhotonClient *self, } static void PyPhotonClient_dealloc(PyPhotonClient *self) { - free(((PyPhotonClient *) self)->photon_connection); + photon_disconnect(((PyPhotonClient *) self)->photon_connection); Py_TYPE(self)->tp_free((PyObject *) self); } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 9c4d8bcba..c24b1e562 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -57,21 +57,27 @@ void print_resource_info(const local_scheduler_state *state, #endif } +int force_kill_worker(event_loop *loop, timer_id id, void *context) { + local_scheduler_client *worker = (local_scheduler_client *) context; + kill(worker->pid, SIGKILL); + close(worker->sock); + free(worker); + return EVENT_LOOP_TIMER_DONE; +} + /** * Kill a worker, if it is a child process, and clean up all of its associated * state. * * @param worker A pointer to the worker we want to kill. - * @param wait A bool representing whether we should wait for the worker's - * process to exit. If the worker is not a child process, this flag is - * ignored. + * @param cleanup A bool representing whether we're cleaning up the entire local + * scheduler's state, or just this worker. If true, then the worker will + * be force-killed immediately. Else, the worker will be given a chance + * to clean up its own state. * @return Void. */ -void kill_worker(local_scheduler_client *worker, bool wait) { - /* TODO(swang): This method should also propagate changes to other parts of - * the system to reflect the killed task in progress, if there was one. This - * includes updating dynamic resources and updating the task table. */ - /* Erase the worker from the array of workers. */ +void kill_worker(local_scheduler_client *worker, bool cleanup) { + /* Erase the local scheduler's reference to the worker. */ local_scheduler_state *state = worker->local_scheduler_state; int num_workers = utarray_len(state->workers); for (int i = 0; i < utarray_len(state->workers); ++i) { @@ -86,6 +92,8 @@ void kill_worker(local_scheduler_client *worker, bool wait) { "Found duplicate workers"); CHECKM(utarray_len(state->workers) != num_workers, "Tried to kill worker that doesn't exist"); + /* Erase the algorithm state's reference to the worker. */ + handle_worker_removed(state, state->algorithm_state, worker); /* Remove the client socket from the event loop so that we don't process the * SIGPIPE when the worker is killed. */ @@ -93,27 +101,47 @@ void kill_worker(local_scheduler_client *worker, bool wait) { /* If the worker has registered a process ID with us and it's a child * process, use it to send a kill signal. */ + bool free_worker = true; if (worker->is_child && worker->pid != 0) { - kill(worker->pid, SIGKILL); - if (wait) { - /* Wait for the process to exit. */ + if (cleanup) { + /* If we're exiting the local scheduler anyway, it's okay to force kill + * the worker immediately. Wait for the process to exit. */ + kill(worker->pid, SIGKILL); waitpid(worker->pid, NULL, 0); + close(worker->sock); + } else { + /* If we're just cleaning up a single worker, allow it some time to clean + * up its state before force killing. The client socket will be closed + * and the worker struct will be freed after the timeout. */ + kill(worker->pid, SIGTERM); + event_loop_add_timer(state->loop, KILL_WORKER_TIMEOUT_MILLISECONDS, + force_kill_worker, (void *) worker); + free_worker = false; } LOG_INFO("Killed worker with pid %d", worker->pid); } - /* Clean up the client socket after killing the worker so that the worker - * can't receive the SIGPIPE before exiting. */ - close(worker->sock); - /* Clean up the task in progress. */ if (worker->task_in_progress) { - /* TODO(swang): Update the task table to mark the task as lost. */ - free_task(worker->task_in_progress); + /* Return the resources that the worker was using. */ + task_spec *spec = task_task_spec(worker->task_in_progress); + update_dynamic_resources(state, spec, true); + /* Update the task table to reflect that the task failed to complete. */ + if (state->db != NULL) { + task_set_state(worker->task_in_progress, TASK_STATUS_LOST); + task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL); + } else { + free_task(worker->task_in_progress); + } } LOG_DEBUG("Killed worker with pid %d", worker->pid); - free(worker); + if (free_worker) { + /* Clean up the client socket after killing the worker so that the worker + * can't receive the SIGPIPE before exiting. */ + close(worker->sock); + free(worker); + } } void free_local_scheduler(local_scheduler_state *state) { @@ -130,15 +158,6 @@ void free_local_scheduler(local_scheduler_state *state) { state->config.start_worker_command = NULL; } - /* Disconnect from the database. */ - if (state->db != NULL) { - db_disconnect(state->db); - state->db = NULL; - } - /* Disconnect from plasma. */ - plasma_disconnect(state->plasma_conn); - state->plasma_conn = NULL; - /* Kill any child processes that didn't register as a worker yet. */ pid_t *worker_pid; for (worker_pid = (pid_t *) utarray_front(state->child_pids); @@ -152,6 +171,8 @@ void free_local_scheduler(local_scheduler_state *state) { /* Free the list of workers and any tasks that are still in progress on those * workers. */ + /* TODO(swang): It's possible that the local scheduler will exit before all + * of its task table updates make it to redis. */ for (local_scheduler_client **worker = (local_scheduler_client **) utarray_front(state->workers); worker != NULL; @@ -161,6 +182,15 @@ void free_local_scheduler(local_scheduler_state *state) { utarray_free(state->workers); state->workers = NULL; + /* Disconnect from the database. */ + if (state->db != NULL) { + db_disconnect(state->db); + state->db = NULL; + } + /* Disconnect from plasma. */ + plasma_disconnect(state->plasma_conn); + state->plasma_conn = NULL; + /* Free the mapping from the actor ID to the ID of the local scheduler * responsible for that actor. */ actor_map_entry *current_actor_map_entry, *temp_actor_map_entry; @@ -480,21 +510,41 @@ void reconstruct_task_update_callback(task *task, void *user_context) { } } -void reconstruct_result_lookup_callback(object_id reconstruct_object_id, - task_id task_id, - void *user_context) { +void reconstruct_evicted_result_lookup_callback(object_id reconstruct_object_id, + task_id task_id, + void *user_context) { /* TODO(swang): The following check will fail if an object was created by a * put. */ CHECKM(!IS_NIL_ID(task_id), "No task information found for object during reconstruction"); local_scheduler_state *state = user_context; - /* Try to claim the responsibility for reconstruction by doing a test-and-set - * of the task's scheduling state in the global state. If the task's - * scheduling state is pending completion, assume that reconstruction is - * already being taken care of. NOTE: This codepath is not responsible for - * detecting failure of the other reconstruction, or updating the - * scheduling_state accordingly. */ - task_table_test_and_update(state->db, task_id, TASK_STATUS_DONE, + /* If there are no other instances of the task running, it's safe for us to + * claim responsibility for reconstruction. */ + task_table_test_and_update(state->db, task_id, + (TASK_STATUS_DONE | TASK_STATUS_LOST), + TASK_STATUS_RECONSTRUCTING, NULL, + reconstruct_task_update_callback, state); +} + +void reconstruct_failed_result_lookup_callback(object_id reconstruct_object_id, + task_id task_id, + void *user_context) { + /* TODO(swang): The following check will fail if an object was created by a + * put. */ + if (IS_NIL_ID(task_id)) { + /* NOTE(swang): For some reason, the result table update sometimes happens + * after this lookup returns, possibly due to concurrent clients. In most + * cases, this is okay because the initial execution is probably still + * pending, so for now, we log a warning and suppress reconstruction. */ + LOG_WARN( + "No task information found for object during reconstruction (no object " + "entry yet)"); + return; + } + local_scheduler_state *state = user_context; + /* If the task failed to finish, it's safe for us to claim responsibility for + * reconstruction. */ + task_table_test_and_update(state->db, task_id, TASK_STATUS_LOST, TASK_STATUS_RECONSTRUCTING, NULL, reconstruct_task_update_callback, state); } @@ -508,10 +558,19 @@ void reconstruct_object_lookup_callback(object_id reconstruct_object_id, * any nodes. NOTE: This codepath is not responsible for checking if the * object table entry is up-to-date. */ local_scheduler_state *state = user_context; + /* Look up the task that created the object in the result table. */ if (manager_count == 0) { - /* Look up the task that created the object in the result table. */ + /* If the object was created and later evicted, we reconstruct the object + * if and only if there are no other instances of the task running. */ result_table_lookup(state->db, reconstruct_object_id, NULL, - reconstruct_result_lookup_callback, (void *) state); + reconstruct_evicted_result_lookup_callback, + (void *) state); + } else if (manager_count == -1) { + /* If the object has not been created yet, we reconstruct the object if and + * only if the task that created the object failed to complete. */ + result_table_lookup(state->db, reconstruct_object_id, NULL, + reconstruct_failed_result_lookup_callback, + (void *) state); } } @@ -541,6 +600,17 @@ void process_message(event_loop *loop, switch (type) { case SUBMIT_TASK: { task_spec *spec = (task_spec *) utarray_front(state->input_buffer); + /* Update the result table, which holds mappings of object ID -> ID of the + * task that created it. */ + if (state->db != NULL) { + task_id task_id = task_spec_id(spec); + for (int64_t i = 0; i < task_num_returns(spec); ++i) { + object_id return_id = task_return(spec, i); + result_table_add(state->db, return_id, task_id, NULL, NULL, NULL); + } + } + + /* Handle the task submission. */ if (actor_ids_equal(task_spec_actor_id(spec), NIL_ACTOR_ID)) { handle_task_submitted(state, state->algorithm_state, spec); } else { diff --git a/src/photon/photon_scheduler.h b/src/photon/photon_scheduler.h index 93c1df52f..3839de916 100644 --- a/src/photon/photon_scheduler.h +++ b/src/photon/photon_scheduler.h @@ -7,6 +7,10 @@ /* The duration between local scheduler heartbeats. */ #define LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS 100 +/* The duration that we wait after sending a worker SIGTERM before sending the + * worker SIGKILL. */ +#define KILL_WORKER_TIMEOUT_MILLISECONDS 100 + #define DEFAULT_NUM_CPUS INT16_MAX #define DEFAULT_NUM_GPUS 0 diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index a90af4ad3..4f0268377 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -50,19 +50,12 @@ typedef struct { photon_conn **conns; } photon_mock; -photon_mock *init_photon_mock(bool connect_to_redis, - int num_workers, - int num_mock_workers) { +photon_mock *init_photon_mock(int num_workers, int num_mock_workers) { const char *node_ip_address = "127.0.0.1"; - const char *redis_addr = NULL; - int redis_port = -1; + const char *redis_addr = node_ip_address; + int redis_port = 6379; const double static_resource_conf[MAX_RESOURCE_INDEX] = {DEFAULT_NUM_CPUS, DEFAULT_NUM_GPUS}; - if (connect_to_redis) { - redis_addr = node_ip_address; - redis_port = 6379; - } - photon_mock *mock = malloc(sizeof(photon_mock)); memset(mock, 0, sizeof(photon_mock)); mock->loop = event_loop_create(); @@ -114,10 +107,25 @@ photon_mock *init_photon_mock(bool connect_to_redis, } void destroy_photon_mock(photon_mock *mock) { + /* Disconnect clients. */ for (int i = 0; i < mock->num_photon_conns; ++i) { photon_disconnect(mock->conns[i]); } free(mock->conns); + + /* Kill all the workers and run the event loop again so that the task table + * updates propagate and the tasks in progress are freed. */ + local_scheduler_client **worker = (local_scheduler_client **) utarray_eltptr( + mock->photon_state->workers, 0); + while (worker != NULL) { + kill_worker(*worker, true); + worker = (local_scheduler_client **) utarray_eltptr( + mock->photon_state->workers, 0); + } + event_loop_add_timer(mock->loop, 500, + (event_loop_timer_handler) timeout_handler, NULL); + event_loop_run(mock->loop); + /* This also frees mock->loop. */ free_local_scheduler(mock->photon_state); close(mock->plasma_store_fd); @@ -138,7 +146,7 @@ void reset_worker(photon_mock *mock, local_scheduler_client *worker) { * value, the task should get assigned to a worker again. */ TEST object_reconstruction_test(void) { - photon_mock *photon = init_photon_mock(true, 0, 1); + photon_mock *photon = init_photon_mock(0, 1); photon_conn *worker = photon->conns[0]; /* Create a task with zero dependencies and one return value. */ @@ -207,7 +215,7 @@ TEST object_reconstruction_test(void) { * should trigger reconstruction of all previous tasks in the lineage. */ TEST object_reconstruction_recursive_test(void) { - photon_mock *photon = init_photon_mock(true, 0, 1); + photon_mock *photon = init_photon_mock(0, 1); photon_conn *worker = photon->conns[0]; /* Create a chain of tasks, each one dependent on the one before it. Mark * each object as available so that tasks will run immediately. */ @@ -316,7 +324,7 @@ void object_reconstruction_suppression_callback(object_id object_id, } TEST object_reconstruction_suppression_test(void) { - photon_mock *photon = init_photon_mock(true, 0, 1); + photon_mock *photon = init_photon_mock(0, 1); photon_conn *worker = photon->conns[0]; object_reconstruction_suppression_spec = example_task_spec(0, 1); @@ -365,7 +373,7 @@ TEST object_reconstruction_suppression_test(void) { } TEST task_dependency_test(void) { - photon_mock *photon = init_photon_mock(false, 0, 1); + photon_mock *photon = init_photon_mock(0, 1); local_scheduler_state *state = photon->photon_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state; /* Get the first worker. */ @@ -440,7 +448,7 @@ TEST task_dependency_test(void) { } TEST task_multi_dependency_test(void) { - photon_mock *photon = init_photon_mock(false, 0, 1); + photon_mock *photon = init_photon_mock(0, 1); local_scheduler_state *state = photon->photon_state; scheduling_algorithm_state *algorithm_state = state->algorithm_state; /* Get the first worker. */ @@ -516,7 +524,7 @@ TEST task_multi_dependency_test(void) { TEST start_kill_workers_test(void) { /* Start some workers. */ int num_workers = 4; - photon_mock *photon = init_photon_mock(true, num_workers, 0); + photon_mock *photon = init_photon_mock(num_workers, 0); /* We start off with num_workers children processes, but no workers * registered yet. */ ASSERT_EQ(utarray_len(photon->photon_state->child_pids), num_workers); @@ -548,7 +556,7 @@ TEST start_kill_workers_test(void) { /* After killing a worker, its state is cleaned up. */ local_scheduler_client *worker = *(local_scheduler_client **) utarray_eltptr( photon->photon_state->workers, 0); - kill_worker(worker, true); + kill_worker(worker, false); ASSERT_EQ(utarray_len(photon->photon_state->child_pids), 0); ASSERT_EQ(utarray_len(photon->photon_state->workers), num_workers - 1); @@ -581,9 +589,9 @@ SUITE(photon_tests) { RUN_REDIS_TEST(object_reconstruction_test); RUN_REDIS_TEST(object_reconstruction_recursive_test); RUN_REDIS_TEST(object_reconstruction_suppression_test); - RUN_TEST(task_dependency_test); - RUN_TEST(task_multi_dependency_test); - RUN_TEST(start_kill_workers_test); + RUN_REDIS_TEST(task_dependency_test); + RUN_REDIS_TEST(task_multi_dependency_test); + RUN_REDIS_TEST(start_kill_workers_test); } GREATEST_MAIN_DEFS(); diff --git a/test/component_failures_test.py b/test/component_failures_test.py index e1bf0219a..f7d544908 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -70,5 +70,38 @@ class ComponentFailureTest(unittest.TestCase): self.assertTrue(ray.services.all_processes_alive(exclude=[ray.services.PROCESS_TYPE_WORKER])) ray.worker.cleanup() + def _testWorkerFailed(self, num_local_schedulers): + @ray.remote + def f(x): + time.sleep(0.5) + return x + + num_initial_workers = 4 + ray.worker._init(num_workers=num_initial_workers * num_local_schedulers, + num_local_schedulers=num_local_schedulers, + start_workers_from_local_scheduler=False, + start_ray_local=True, + num_cpus=[num_initial_workers] * num_local_schedulers) + # Submit more tasks than there are workers so that all workers and cores + # are utilized. + object_ids = [f.remote(i) for i in range(num_initial_workers * num_local_schedulers)] + object_ids += [f.remote(object_id) for object_id in object_ids] + # Allow the tasks some time to begin executing. + time.sleep(0.1) + # Kill the workers as the tasks execute. + for worker in ray.services.all_processes[ray.services.PROCESS_TYPE_WORKER]: + worker.terminate() + time.sleep(0.1) + # Make sure that we can still get the objects after the executing tasks died. + ray.get(object_ids) + + ray.worker.cleanup() + + def testWorkerFailed(self): + self._testWorkerFailed(1) + + def testWorkerFailedMultinode(self): + self._testWorkerFailed(4) + if __name__ == "__main__": unittest.main(verbosity=2)