From b3c05655a05c6b0fac13b26387abb1ea8129bc8d Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 6 Dec 2016 15:47:31 -0800 Subject: [PATCH] Enable fetching objects from remote object stores. (#87) * Fetch missing dependencies from local scheduler. * Factor out global scheduler policy state. * Use object_table_subscribe instead of object_table_lookup. * Fix bug in which timer was being created twice for a single fetch request. * Free old manager vector. --- lib/python/ray/worker.py | 1 + src/global_scheduler/global_scheduler.c | 6 +- src/global_scheduler/global_scheduler.h | 4 + .../global_scheduler_algorithm.c | 28 ++++++- .../global_scheduler_algorithm.h | 41 +++++++++- src/photon/photon_algorithm.c | 77 +++++++++++++++++++ src/plasma/plasma_manager.c | 49 ++++++------ test/array_test.py | 2 +- 8 files changed, 173 insertions(+), 35 deletions(-) diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 7435d63b5..f801055a2 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -437,6 +437,7 @@ class Worker(object): Args: objectid (object_id.ObjectID): The object ID of the value to retrieve. """ + self.plasma_client.fetch2([objectid.id()]) buff = self.plasma_client.get(objectid.id()) metadata = self.plasma_client.get_metadata(objectid.id()) metadata_size = len(metadata) diff --git a/src/global_scheduler/global_scheduler.c b/src/global_scheduler/global_scheduler.c index 40e3269ac..ccaca61de 100644 --- a/src/global_scheduler/global_scheduler.c +++ b/src/global_scheduler/global_scheduler.c @@ -33,12 +33,14 @@ global_scheduler_state *init_global_scheduler(event_loop *loop, state->db = db_connect(redis_addr, redis_port, "global_scheduler", "", -1); db_attach(state->db, loop, false); utarray_new(state->local_schedulers, &local_scheduler_icd); + state->policy_state = init_global_scheduler_policy(); return state; } void free_global_scheduler(global_scheduler_state *state) { db_disconnect(state->db); utarray_free(state->local_schedulers); + destroy_global_scheduler_policy(state->policy_state); free(state); } @@ -57,7 +59,7 @@ void signal_handler(int signal) { void process_task_waiting(task *task, void *user_context) { global_scheduler_state *state = (global_scheduler_state *) user_context; - handle_task_waiting(state, task); + handle_task_waiting(state, state->policy_state, task); } void process_new_db_client(db_client_id db_client_id, @@ -65,7 +67,7 @@ void process_new_db_client(db_client_id db_client_id, void *user_context) { global_scheduler_state *state = (global_scheduler_state *) user_context; if (strcmp(client_type, "photon") == 0) { - handle_new_local_scheduler(state, db_client_id); + handle_new_local_scheduler(state, state->policy_state, db_client_id); } } diff --git a/src/global_scheduler/global_scheduler.h b/src/global_scheduler/global_scheduler.h index d9b97ea67..435f36d1b 100644 --- a/src/global_scheduler/global_scheduler.h +++ b/src/global_scheduler/global_scheduler.h @@ -12,6 +12,8 @@ typedef struct { db_client_id id; } local_scheduler; +typedef struct global_scheduler_policy_state global_scheduler_policy_state; + typedef struct { /** The global scheduler event loop. */ event_loop *loop; @@ -19,6 +21,8 @@ typedef struct { db_handle *db; /** The local schedulers that are connected to Redis. */ UT_array *local_schedulers; + /** The state managed by the scheduling policy. */ + global_scheduler_policy_state *policy_state; } global_scheduler_state; void assign_task_to_local_scheduler(global_scheduler_state *state, diff --git a/src/global_scheduler/global_scheduler_algorithm.c b/src/global_scheduler/global_scheduler_algorithm.c index 362494ba2..4399cf7a5 100644 --- a/src/global_scheduler/global_scheduler_algorithm.c +++ b/src/global_scheduler/global_scheduler_algorithm.c @@ -3,10 +3,26 @@ #include "global_scheduler_algorithm.h" -void handle_task_waiting(global_scheduler_state *state, task *task) { +global_scheduler_policy_state *init_global_scheduler_policy(void) { + global_scheduler_policy_state *policy_state = + malloc(sizeof(global_scheduler_policy_state)); + policy_state->round_robin_index = 0; + return policy_state; +} + +void destroy_global_scheduler_policy( + global_scheduler_policy_state *policy_state) { + free(policy_state); +} + +void handle_task_waiting(global_scheduler_state *state, + global_scheduler_policy_state *policy_state, + task *task) { if (utarray_len(state->local_schedulers) > 0) { - local_scheduler *scheduler = - (local_scheduler *) utarray_eltptr(state->local_schedulers, 0); + local_scheduler *scheduler = (local_scheduler *) utarray_eltptr( + state->local_schedulers, policy_state->round_robin_index); + policy_state->round_robin_index += 1; + policy_state->round_robin_index %= utarray_len(state->local_schedulers); assign_task_to_local_scheduler(state, task, scheduler->id); } else { CHECKM(0, "We currently don't handle this case."); @@ -14,15 +30,19 @@ void handle_task_waiting(global_scheduler_state *state, task *task) { } void handle_object_available(global_scheduler_state *state, + global_scheduler_policy_state *policy_state, object_id object_id) { /* Do nothing for now. */ } -void handle_local_scheduler_heartbeat(global_scheduler_state *state) { +void handle_local_scheduler_heartbeat( + global_scheduler_state *state, + global_scheduler_policy_state *policy_state) { /* Do nothing for now. */ } void handle_new_local_scheduler(global_scheduler_state *state, + global_scheduler_policy_state *policy_state, db_client_id db_client_id) { local_scheduler local_scheduler; memset(&local_scheduler, 0, sizeof(local_scheduler)); diff --git a/src/global_scheduler/global_scheduler_algorithm.h b/src/global_scheduler/global_scheduler_algorithm.h index 86f089006..3ea8b6ffc 100644 --- a/src/global_scheduler/global_scheduler_algorithm.h +++ b/src/global_scheduler/global_scheduler_algorithm.h @@ -13,25 +13,53 @@ * */ +/** The state managed by the global scheduling policy. */ +struct global_scheduler_policy_state { + /** The index of the next local scheduler to assign a task to. */ + int64_t round_robin_index; +}; + +/** + * Create the state of the global scheduler policy. This state must be freed by + * the caller. + * + * @return The state of the scheduling policy. + */ +global_scheduler_policy_state *init_global_scheduler_policy(void); + +/** + * Free the global scheduler policy state. + * + * @param policy_state The policy state to free. + * @return Void. + */ +void destroy_global_scheduler_policy( + global_scheduler_policy_state *policy_state); + /** * Assign the task to a local scheduler. At the moment, this simply assigns the - * task to the first local scheduler and if there are no local schedulers it - * fails. + * task to the local schedulers in a round robin fashion. If there are no local + * schedulers it fails. * * @param state The global scheduler state. + * @param policy_state The state managed by the scheduling policy. * @param task The task that is waiting to be scheduled. * @return Void. */ -void handle_task_waiting(global_scheduler_state *state, task *task); +void handle_task_waiting(global_scheduler_state *state, + global_scheduler_policy_state *policy_state, + task *task); /** * Handle the fact that a new object is available. * * @param state The global scheduler state. + * @param policy_state The state managed by the scheduling policy. * @param object_id The ID of the object that is now available. * @return Void. */ void handle_object_available(global_scheduler_state *state, + global_scheduler_policy_state *policy_state, object_id object_id); /** @@ -39,19 +67,24 @@ void handle_object_available(global_scheduler_state *state, * placeholder for now. * * @param state The global scheduler state. + * @param policy_state The state managed by the scheduling policy. * @return Void. */ -void handle_local_scheduler_heartbeat(global_scheduler_state *state); +void handle_local_scheduler_heartbeat( + global_scheduler_state *state, + global_scheduler_policy_state *policy_state); /** * Handle the presence of a new local scheduler. Currently, this just adds the * local scheduler to a queue of local schedulers. * * @param state The global scheduler state. + * @param policy_state The state managed by the scheduling policy. * @param The db client ID of the new local scheduler. * @return Void. */ void handle_new_local_scheduler(global_scheduler_state *state, + global_scheduler_policy_state *policy_state, db_client_id db_client_id); #endif /* GLOBAL_SCHEDULER_ALGORITHM_H */ diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 982a2cda4..c2827e1ba 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -28,6 +28,20 @@ typedef struct { UT_hash_handle handle; } available_object; +/** A data structure used to track which objects are being fetched. */ +typedef struct { + /** The object ID that we are trying to fetch. */ + object_id object_id; + /** The local scheduler state. */ + local_scheduler_state *state; + /** The scheduling algorithm state. */ + scheduling_algorithm_state *algorithm_state; + /** The ID for the timer that will time out the current request. */ + int64_t timer; + /** Handle for the uthash table. */ + UT_hash_handle hh; +} fetch_object_request; + /** Part of the photon state that is maintained by the scheduling algorithm. */ struct scheduling_algorithm_state { /** An array of pointers to tasks that are waiting to be scheduled. */ @@ -38,6 +52,9 @@ struct scheduling_algorithm_state { /** A hash map of the objects that are available in the local Plasma store. * This information could be a little stale. */ available_object *local_objects; + /** A hash map of the objects that are currently being fetched by this local + * scheduler. The key is the object ID. */ + fetch_object_request *fetch_requests; }; scheduling_algorithm_state *make_scheduling_algorithm_state(void) { @@ -48,6 +65,8 @@ scheduling_algorithm_state *make_scheduling_algorithm_state(void) { /* Initialize the local data structures used for queuing tasks and workers. */ algorithm_state->task_queue = NULL; utarray_new(algorithm_state->available_workers, &ut_int_icd); + /* Initialize the hash table of objects being fetched. */ + algorithm_state->fetch_requests = NULL; return algorithm_state; } @@ -65,6 +84,11 @@ void free_scheduling_algorithm_state( HASH_DELETE(handle, algorithm_state->local_objects, available_obj); free(available_obj); } + fetch_object_request *fetch_elt, *tmp_fetch_elt; + HASH_ITER(hh, algorithm_state->fetch_requests, fetch_elt, tmp_fetch_elt) { + HASH_DELETE(hh, algorithm_state->fetch_requests, fetch_elt); + free(fetch_elt); + } free(algorithm_state); } @@ -95,6 +119,46 @@ bool can_run(scheduling_algorithm_state *algorithm_state, task_spec *task) { return true; } +/* TODO(rkn): This method will need to be changed to call reconstruct. */ +int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { + fetch_object_request *fetch_req = (fetch_object_request *) context; + object_id object_ids[1] = {fetch_req->object_id}; + plasma_fetch2(fetch_req->state->plasma_conn, 1, object_ids); + return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS; +} + +void fetch_missing_dependencies(local_scheduler_state *state, + scheduling_algorithm_state *algorithm_state, + task_spec *spec) { + int64_t num_args = task_num_args(spec); + for (int i = 0; i < num_args; ++i) { + if (task_arg_type(spec, i) == ARG_BY_REF) { + object_id obj_id = task_arg_id(spec, i); + available_object *entry; + HASH_FIND(handle, algorithm_state->local_objects, &obj_id, sizeof(obj_id), + entry); + if (entry == NULL) { + /* The object is not present locally, fetch the object. */ + object_id object_ids[1] = {obj_id}; + plasma_fetch2(state->plasma_conn, 1, object_ids); + /* Create a fetch request and add a timer to the event loop to ensure + * that the fetch actually happens. */ + fetch_object_request *fetch_req = malloc(sizeof(fetch_object_request)); + fetch_req->object_id = obj_id; + fetch_req->state = state; + fetch_req->algorithm_state = algorithm_state; + fetch_req->timer = event_loop_add_timer( + state->loop, LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS, + fetch_object_timeout_handler, fetch_req); + /* The fetch request will be freed and removed from the hash table in + * handle_object_available when the object becomes available locally. */ + HASH_ADD(hh, algorithm_state->fetch_requests, object_id, + sizeof(fetch_req->object_id), fetch_req); + } + } + } +} + /** * If there is a task whose dependencies are available locally, assign it to the * worker. This does not remove the worker from the available worker queue. @@ -192,6 +256,8 @@ void handle_task_scheduled(local_scheduler_state *state, * the global scheduler, so we can safely assert that there is a connection * to the database. */ DCHECK(state->db != NULL); + /* Initiate fetch calls for any dependencies that are not present locally. */ + fetch_missing_dependencies(state, algorithm_state, spec); /* If this task's dependencies are available locally, and if there is an * available worker, then assign this task to an available worker. If we * cannot assign the task to a worker immediately, queue the task locally. */ @@ -248,4 +314,15 @@ void handle_object_available(local_scheduler_state *state, num_tasks_scheduled += 1; } utarray_erase(algorithm_state->available_workers, 0, num_tasks_scheduled); + + /* If we were previously trying to fetch this object, remove the fetch request + * from the hash table. */ + fetch_object_request *fetch_req; + HASH_FIND(hh, algorithm_state->fetch_requests, &object_id, sizeof(object_id), + fetch_req); + if (fetch_req != NULL) { + HASH_DELETE(hh, algorithm_state->fetch_requests, fetch_req); + CHECK(event_loop_remove_timer(state->loop, fetch_req->timer) == AE_OK); + free(fetch_req); + } } diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 5d311a9de..cbf2d3811 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -148,9 +148,6 @@ typedef struct { /** The ID for the timer that will time out the current request to the state * database or another plasma manager. */ int64_t timer; - /** How many retries we have left for the request. Decremented on every - * timeout. */ - int num_retries; /** Pointer to the array containing the manager locations of this object. This * struct owns and must free each entry. */ char **manager_vector; @@ -381,7 +378,8 @@ void remove_fetch_request(plasma_manager_state *manager_state, HASH_DELETE(hh, manager_state->fetch_requests2, fetch_req); /* Remove the timer associated with this fetch request. */ if (fetch_req->timer != -1) { - event_loop_remove_timer(manager_state->loop, fetch_req->timer); + CHECK(event_loop_remove_timer(manager_state->loop, fetch_req->timer) == + AE_OK); } /* Free the fetch request and everything in it. */ for (int i = 0; i < fetch_req->manager_count; ++i) { @@ -845,16 +843,8 @@ int manager_timeout_handler(event_loop *loop, timer_id id, void *context) { int manager_timeout_handler2(event_loop *loop, timer_id id, void *context) { fetch_request2 *fetch_req = context; plasma_manager_state *manager_state = fetch_req->manager_state; - LOG_DEBUG("Timer went off, %d tries left", fetch_req->num_retries); - if (fetch_req->num_retries > 0) { - request_transfer_from2(manager_state, fetch_req->object_id); - fetch_req->num_retries--; - return MANAGER_TIMEOUT; - } - /* TODO(rkn): This shouldn't be fatal. Instead, it should do nothing. */ - CHECK(0); - remove_fetch_request(manager_state, fetch_req); - return EVENT_LOOP_TIMER_DONE; + request_transfer_from2(manager_state, fetch_req->object_id); + return MANAGER_TIMEOUT; } bool is_object_local(plasma_manager_state *state, object_id object_id) { @@ -919,6 +909,9 @@ void request_transfer2(object_id object_id, const char *manager_vector[], void *context) { plasma_manager_state *manager_state = (plasma_manager_state *) context; + /* This callback is called from object_table_subscribe, which guarantees that + * the manager vector contains at least one element. */ + CHECK(manager_count >= 1); fetch_request2 *fetch_req; HASH_FIND(hh, manager_state->fetch_requests2, &object_id, sizeof(object_id), fetch_req); @@ -936,12 +929,15 @@ void request_transfer2(object_id object_id, * callback gets called. */ CHECK(fetch_req != NULL); - if (manager_count == 0) { - /* TODO(rkn): Figure out what to do in this case. */ - remove_fetch_request(manager_state, fetch_req); - return; + /* This method may be run multiple times, so if we are updating the manager + * vector, we need to free the previous manager vector. */ + if (fetch_req->manager_count != 0) { + for (int i = 0; i < fetch_req->manager_count; ++i) { + free(fetch_req->manager_vector[i]); + } + free(fetch_req->manager_vector); } - /* Pick a different manager to request a transfer from on every attempt. */ + /* Update the manager vector. */ fetch_req->manager_count = manager_count; fetch_req->manager_vector = malloc(manager_count * sizeof(char *)); fetch_req->next_manager = 0; @@ -955,9 +951,13 @@ void request_transfer2(object_id object_id, /* Wait for the object data for the default number of retries, which timeout * after a default interval. */ request_transfer_from2(manager_state, object_id); - fetch_req->num_retries = NUM_RETRIES; - fetch_req->timer = event_loop_add_timer(manager_state->loop, MANAGER_TIMEOUT, - manager_timeout_handler2, fetch_req); + /* It is possible for this method to be called multiple times, but we only + * need to create a timer once. */ + if (fetch_req->timer == -1) { + fetch_req->timer = + event_loop_add_timer(manager_state->loop, MANAGER_TIMEOUT, + manager_timeout_handler2, fetch_req); + } } void process_fetch_request(client_connection *client_conn, @@ -1041,8 +1041,8 @@ void process_fetch_requests2(client_connection *client_conn, retry.num_retries = NUM_RETRIES; retry.timeout = MANAGER_TIMEOUT; retry.fail_callback = fatal_table_callback; - object_table_lookup(manager_state->db, obj_id, &retry, request_transfer2, - manager_state); + object_table_subscribe(manager_state->db, obj_id, request_transfer2, + manager_state, &retry, NULL, NULL); } } @@ -1520,6 +1520,7 @@ void process_object_notification(event_loop *loop, HASH_FIND(hh, state->fetch_requests2, &obj_id, sizeof(obj_id), fetch_req); if (fetch_req != NULL) { remove_fetch_request(state, fetch_req); + /* TODO(rkn): We also really should unsubscribe from the object table. */ } /* Notify any clients who were waiting on a fetch to this object and tick * off objects we are waiting for. */ diff --git a/test/array_test.py b/test/array_test.py index c09f804b3..8e2d1e84d 100644 --- a/test/array_test.py +++ b/test/array_test.py @@ -58,7 +58,7 @@ class DistributedArrayTest(unittest.TestCase): def testMethods(self): for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]: reload(module) - ray.init(start_ray_local=True, num_workers=10) + ray.init(start_ray_local=True, num_workers=10, num_local_schedulers=2) x = da.zeros.remote([9, 25, 51], "float") assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51]))