From 6828d694aef08d494989e0b60e61652da1563f97 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 29 Dec 2016 23:10:38 -0800 Subject: [PATCH] Test object notifications from Plasma store (#141) * Object notification test for Photon, and turn on valgrind for Photon C tests * Test object notification handler in the plasma manager * Fix hanging test case --- .travis.yml | 4 ++ src/photon/photon_algorithm.c | 1 + src/photon/photon_scheduler.c | 10 +++ src/photon/test/photon_tests.c | 105 +++++++++++++++++++++++++++----- src/plasma/plasma_manager.h | 10 +++ src/plasma/test/manager_tests.c | 45 ++++++++++++++ 6 files changed, 160 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index ba17b6fc3..0c182a5b8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -45,6 +45,10 @@ matrix: - make valgrind - cd ../.. + - cd src/photon + - make valgrind + - cd ../.. + - ./.travis/install-ray.sh script: - python src/plasma/test/test.py valgrind diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index 7066d3dee..1a08122b6 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -364,6 +364,7 @@ void handle_object_removed(local_scheduler_state *state, object_id object_id) { sizeof(object_id), entry); if (entry != NULL) { HASH_DELETE(handle, algorithm_state->local_objects, entry); + free(entry); } } diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index 328ce0943..da0aa71c0 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -85,12 +85,22 @@ void free_local_scheduler(local_scheduler_state *state) { db_disconnect(state->db); } plasma_disconnect(state->plasma_conn); + worker_index *current_worker_index, *temp_worker_index; HASH_ITER(hh, state->worker_index, current_worker_index, temp_worker_index) { HASH_DEL(state->worker_index, current_worker_index); free(current_worker_index); } + + worker *w; + for (w = (worker *) utarray_front(state->workers); w != NULL; + w = (worker *) utarray_next(state->workers, w)) { + if (w->task_in_progress) { + free_task(w->task_in_progress); + } + } utarray_free(state->workers); + free_scheduling_algorithm_state(state->algorithm_state); utarray_free(state->input_buffer); event_loop_destroy(state->loop); diff --git a/src/photon/test/photon_tests.c b/src/photon/test/photon_tests.c index 833a83017..bdd816aa9 100644 --- a/src/photon/test/photon_tests.c +++ b/src/photon/test/photon_tests.c @@ -14,6 +14,7 @@ #include "utstring.h" #include "task.h" #include "state/object_table.h" +#include "state/task_table.h" #include "photon.h" #include "photon_scheduler.h" @@ -44,9 +45,13 @@ typedef struct { photon_conn *conn; } photon_mock; -photon_mock *init_photon_mock() { - const char *redis_addr = "127.0.0.1"; - int redis_port = 6379; +photon_mock *init_photon_mock(bool connect_to_redis) { + const char *redis_addr = NULL; + int redis_port = -1; + if (connect_to_redis) { + redis_addr = "127.0.0.1"; + redis_port = 6379; + } photon_mock *mock = malloc(sizeof(photon_mock)); memset(mock, 0, sizeof(photon_mock)); mock->loop = event_loop_create(); @@ -81,24 +86,28 @@ void destroy_photon_mock(photon_mock *mock) { free(mock); } +void reset_worker(photon_mock *mock, int worker_index) { + worker *available_worker = + (worker *) utarray_eltptr(mock->photon_state->workers, worker_index); + available_worker->task_in_progress = NULL; +} + /** * Test that object reconstruction gets called. If a task gets submitted, * assigned to a worker, and then reconstruction is triggered for its return * value, the task should get assigned to a worker again. */ TEST object_reconstruction_test(void) { - photon_mock *photon = init_photon_mock(); + photon_mock *photon = init_photon_mock(true); + /* Create a task with zero dependencies and one return value. */ + task_spec *spec = example_task_spec(0, 1); pid_t pid = fork(); if (pid == 0) { - /* Create a task with zero dependencies and one return value. */ - task_spec *spec = example_task_spec(0, 1); /* Make sure we receive the task twice. First from the initial submission, * and second from the reconstruct request. */ photon_submit(photon->conn, spec); task_spec *task_assigned = photon_get_task(photon->conn); ASSERT_EQ(memcmp(task_assigned, spec, task_spec_size(spec)), 0); - object_id return_id = task_return(spec, 0); - photon_reconstruct_object(photon->conn, return_id); task_spec *reconstruct_task = photon_get_task(photon->conn); ASSERT_EQ(memcmp(reconstruct_task, spec, task_spec_size(spec)), 0); /* Clean up. */ @@ -110,12 +119,25 @@ TEST object_reconstruction_test(void) { } else { /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ - event_loop_add_timer(photon->loop, 1000, + event_loop_add_timer(photon->loop, 500, + (event_loop_timer_handler) timeout_handler, NULL); + event_loop_run(photon->loop); + /* Set the task's status to TASK_STATUS_DONE to prevent the race condition + * that would suppress object reconstruction. */ + task *task = alloc_task(spec, TASK_STATUS_DONE, + get_db_client_id(photon->photon_state->db)); + task_table_add_task(photon->photon_state->db, task, + (retry_info *) &photon_retry, NULL, NULL); + /* Trigger reconstruction, and run the event loop again. */ + object_id return_id = task_return(spec, 0); + photon_reconstruct_object(photon->conn, return_id); + event_loop_add_timer(photon->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(photon->loop); /* Wait for the child process to exit and check that there are no tasks * left in the local scheduler's task queue. Then, clean up. */ wait(NULL); + free_task_spec(spec); ASSERT_EQ(num_tasks_in_queue(photon->photon_state->algorithm_state), 0); destroy_photon_mock(photon); PASS(); @@ -128,7 +150,7 @@ TEST object_reconstruction_test(void) { * should trigger reconstruction of all previous tasks in the lineage. */ TEST object_reconstruction_recursive_test(void) { - photon_mock *photon = init_photon_mock(); + photon_mock *photon = init_photon_mock(true); /* Create a chain of tasks, each one dependent on the one before it. Mark * each object as available so that tasks will run immediately. */ const int NUM_TASKS = 10; @@ -153,9 +175,6 @@ TEST object_reconstruction_recursive_test(void) { 0); free_task_spec(task_assigned); } - /* Request reconstruction of the last return object. */ - object_id return_id = task_return(specs[NUM_TASKS - 1], 0); - photon_reconstruct_object(photon->conn, return_id); /* Check that the workers receive all tasks in the final return object's * lineage during reconstruction. */ for (int i = 0; i < NUM_TASKS; ++i) { @@ -180,7 +199,19 @@ TEST object_reconstruction_recursive_test(void) { } else { /* Run the event loop. NOTE: OSX appears to require the parent process to * listen for events on the open file descriptors. */ - event_loop_add_timer(photon->loop, 1000, + event_loop_add_timer(photon->loop, 500, + (event_loop_timer_handler) timeout_handler, NULL); + event_loop_run(photon->loop); + /* Set the final task's status to TASK_STATUS_DONE to prevent the race + * condition that would suppress object reconstruction. */ + task *last_task = alloc_task(specs[NUM_TASKS - 1], TASK_STATUS_DONE, + get_db_client_id(photon->photon_state->db)); + task_table_add_task(photon->photon_state->db, last_task, + (retry_info *) &photon_retry, NULL, NULL); + /* Trigger reconstruction, and run the event loop again. */ + object_id return_id = task_return(specs[NUM_TASKS - 1], 0); + photon_reconstruct_object(photon->conn, return_id); + event_loop_add_timer(photon->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(photon->loop); /* Wait for the child process to exit and check that there are no tasks @@ -209,7 +240,7 @@ void object_reconstruction_suppression_callback(object_id object_id, } TEST object_reconstruction_suppression_test(void) { - photon_mock *photon = init_photon_mock(); + photon_mock *photon = init_photon_mock(true); object_reconstruction_suppression_spec = example_task_spec(0, 1); object_id return_id = task_return(object_reconstruction_suppression_spec, 0); pid_t pid = fork(); @@ -255,10 +286,54 @@ TEST object_reconstruction_suppression_test(void) { } } +TEST object_notifications_test(void) { + photon_mock *photon = init_photon_mock(false); + local_scheduler_state *state = photon->photon_state; + scheduling_algorithm_state *algorithm_state = state->algorithm_state; + int worker_index = 0; + task_spec *spec = example_task_spec(1, 1); + object_id oid = task_arg_id(spec, 0); + + /* Check that the task gets queued if the task is submitted and a worker is + * available, but the input is not. Once the input is available, the task + * gets assigned. */ + handle_task_submitted(state, algorithm_state, spec); + handle_worker_available(state, algorithm_state, worker_index); + ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1); + handle_object_available(state, algorithm_state, oid); + ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0); + reset_worker(photon, worker_index); + + /* Check that the task gets queued if the task is submitted and the input is + * available, but no worker is available yet. Once a worker is available, the + * task gets assigned. */ + handle_task_submitted(state, algorithm_state, spec); + ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1); + handle_worker_available(state, algorithm_state, worker_index); + ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0); + reset_worker(photon, worker_index); + + /* If an object gets removed, check the first scenario again, where the task + * gets queued if the task is submitted and a worker is available, but the + * input is not. Once the input is made available again, the task gets + * assigned. */ + handle_object_removed(state, oid); + handle_task_submitted(state, algorithm_state, spec); + handle_worker_available(state, algorithm_state, worker_index); + ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1); + handle_object_available(state, algorithm_state, oid); + ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0); + + free_task_spec(spec); + destroy_photon_mock(photon); + PASS(); +} + SUITE(photon_tests) { RUN_REDIS_TEST(object_reconstruction_test); RUN_REDIS_TEST(object_reconstruction_recursive_test); RUN_REDIS_TEST(object_reconstruction_suppression_test); + RUN_TEST(object_notifications_test); } GREATEST_MAIN_DEFS(); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 0b527efe3..826e5b2a1 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -261,4 +261,14 @@ event_loop *get_event_loop(plasma_manager_state *state); */ int get_client_sock(client_connection *conn); +/** + * Return whether or not the object is local. + * + * @param state The state of the plasma manager. + * @param object_id The ID of the object we want to find. + * @return A bool that is true if the requested object is local and false + * otherwise. + */ +bool is_object_local(plasma_manager_state *state, object_id object_id); + #endif /* PLASMA_MANAGER_H */ diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index 1e34473cb..a3e4fa74a 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "common.h" #include "test/test_common.h" @@ -244,11 +245,55 @@ TEST read_write_object_chunk_test(void) { PASS(); } +TEST object_notifications_test(void) { + plasma_mock *local_mock = init_plasma_mock(NULL); + /* Open a non-blocking socket pair to mock the object notifications from the + * plasma store. */ + int fd[2]; + socketpair(AF_UNIX, SOCK_STREAM, 0, fd); + int flags = fcntl(fd[1], F_GETFL, 0); + CHECK(fcntl(fd[1], F_SETFL, flags | O_NONBLOCK) == 0); + + object_id oid = globally_unique_id(); + object_info info = {.obj_id = oid, + .data_size = 10, + .metadata_size = 1, + .create_time = 0, + .construct_duration = 0, + .digest = {0}, + .is_deletion = false}; + + /* Check that the object is not local at first. */ + bool is_local = is_object_local(local_mock->state, oid); + ASSERT(!is_local); + + /* Check that the object is local after receiving an object notification. */ + send(fd[1], (char const *) &info, sizeof(info), 0); + process_object_notification(local_mock->loop, fd[0], local_mock->state, 0); + is_local = is_object_local(local_mock->state, oid); + ASSERT(is_local); + + /* Check that the object is not local after receiving a notification about + * the object deletion. */ + info.is_deletion = true; + send(fd[1], (char const *) &info, sizeof(info), 0); + process_object_notification(local_mock->loop, fd[0], local_mock->state, 0); + is_local = is_object_local(local_mock->state, oid); + ASSERT(!is_local); + + /* Clean up. */ + close(fd[0]); + close(fd[1]); + destroy_plasma_mock(local_mock); + PASS(); +} + SUITE(plasma_manager_tests) { memset(&oid, 1, sizeof(oid)); RUN_TEST(request_transfer_test); RUN_TEST(request_transfer_retry_test); RUN_TEST(read_write_object_chunk_test); + RUN_TEST(object_notifications_test); } GREATEST_MAIN_DEFS();