Test object notifications from Plasma store (#141)

* Object notification test for Photon, and turn on valgrind for Photon C tests

* Test object notification handler in the plasma manager

* Fix hanging test case
This commit is contained in:
Stephanie Wang
2016-12-29 23:10:38 -08:00
committed by Robert Nishihara
parent f9f667de47
commit 6828d694ae
6 changed files with 160 additions and 15 deletions
+4
View File
@@ -45,6 +45,10 @@ matrix:
- make valgrind
- cd ../..
- cd src/photon
- make valgrind
- cd ../..
- ./.travis/install-ray.sh
script:
- python src/plasma/test/test.py valgrind
+1
View File
@@ -364,6 +364,7 @@ void handle_object_removed(local_scheduler_state *state, object_id object_id) {
sizeof(object_id), entry);
if (entry != NULL) {
HASH_DELETE(handle, algorithm_state->local_objects, entry);
free(entry);
}
}
+10
View File
@@ -85,12 +85,22 @@ void free_local_scheduler(local_scheduler_state *state) {
db_disconnect(state->db);
}
plasma_disconnect(state->plasma_conn);
worker_index *current_worker_index, *temp_worker_index;
HASH_ITER(hh, state->worker_index, current_worker_index, temp_worker_index) {
HASH_DEL(state->worker_index, current_worker_index);
free(current_worker_index);
}
worker *w;
for (w = (worker *) utarray_front(state->workers); w != NULL;
w = (worker *) utarray_next(state->workers, w)) {
if (w->task_in_progress) {
free_task(w->task_in_progress);
}
}
utarray_free(state->workers);
free_scheduling_algorithm_state(state->algorithm_state);
utarray_free(state->input_buffer);
event_loop_destroy(state->loop);
+90 -15
View File
@@ -14,6 +14,7 @@
#include "utstring.h"
#include "task.h"
#include "state/object_table.h"
#include "state/task_table.h"
#include "photon.h"
#include "photon_scheduler.h"
@@ -44,9 +45,13 @@ typedef struct {
photon_conn *conn;
} photon_mock;
photon_mock *init_photon_mock() {
const char *redis_addr = "127.0.0.1";
int redis_port = 6379;
photon_mock *init_photon_mock(bool connect_to_redis) {
const char *redis_addr = NULL;
int redis_port = -1;
if (connect_to_redis) {
redis_addr = "127.0.0.1";
redis_port = 6379;
}
photon_mock *mock = malloc(sizeof(photon_mock));
memset(mock, 0, sizeof(photon_mock));
mock->loop = event_loop_create();
@@ -81,24 +86,28 @@ void destroy_photon_mock(photon_mock *mock) {
free(mock);
}
void reset_worker(photon_mock *mock, int worker_index) {
worker *available_worker =
(worker *) utarray_eltptr(mock->photon_state->workers, worker_index);
available_worker->task_in_progress = NULL;
}
/**
* Test that object reconstruction gets called. If a task gets submitted,
* assigned to a worker, and then reconstruction is triggered for its return
* value, the task should get assigned to a worker again.
*/
TEST object_reconstruction_test(void) {
photon_mock *photon = init_photon_mock();
photon_mock *photon = init_photon_mock(true);
/* Create a task with zero dependencies and one return value. */
task_spec *spec = example_task_spec(0, 1);
pid_t pid = fork();
if (pid == 0) {
/* Create a task with zero dependencies and one return value. */
task_spec *spec = example_task_spec(0, 1);
/* Make sure we receive the task twice. First from the initial submission,
* and second from the reconstruct request. */
photon_submit(photon->conn, spec);
task_spec *task_assigned = photon_get_task(photon->conn);
ASSERT_EQ(memcmp(task_assigned, spec, task_spec_size(spec)), 0);
object_id return_id = task_return(spec, 0);
photon_reconstruct_object(photon->conn, return_id);
task_spec *reconstruct_task = photon_get_task(photon->conn);
ASSERT_EQ(memcmp(reconstruct_task, spec, task_spec_size(spec)), 0);
/* Clean up. */
@@ -110,12 +119,25 @@ TEST object_reconstruction_test(void) {
} else {
/* Run the event loop. NOTE: OSX appears to require the parent process to
* listen for events on the open file descriptors. */
event_loop_add_timer(photon->loop, 1000,
event_loop_add_timer(photon->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(photon->loop);
/* Set the task's status to TASK_STATUS_DONE to prevent the race condition
* that would suppress object reconstruction. */
task *task = alloc_task(spec, TASK_STATUS_DONE,
get_db_client_id(photon->photon_state->db));
task_table_add_task(photon->photon_state->db, task,
(retry_info *) &photon_retry, NULL, NULL);
/* Trigger reconstruction, and run the event loop again. */
object_id return_id = task_return(spec, 0);
photon_reconstruct_object(photon->conn, return_id);
event_loop_add_timer(photon->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(photon->loop);
/* Wait for the child process to exit and check that there are no tasks
* left in the local scheduler's task queue. Then, clean up. */
wait(NULL);
free_task_spec(spec);
ASSERT_EQ(num_tasks_in_queue(photon->photon_state->algorithm_state), 0);
destroy_photon_mock(photon);
PASS();
@@ -128,7 +150,7 @@ TEST object_reconstruction_test(void) {
* should trigger reconstruction of all previous tasks in the lineage.
*/
TEST object_reconstruction_recursive_test(void) {
photon_mock *photon = init_photon_mock();
photon_mock *photon = init_photon_mock(true);
/* Create a chain of tasks, each one dependent on the one before it. Mark
* each object as available so that tasks will run immediately. */
const int NUM_TASKS = 10;
@@ -153,9 +175,6 @@ TEST object_reconstruction_recursive_test(void) {
0);
free_task_spec(task_assigned);
}
/* Request reconstruction of the last return object. */
object_id return_id = task_return(specs[NUM_TASKS - 1], 0);
photon_reconstruct_object(photon->conn, return_id);
/* Check that the workers receive all tasks in the final return object's
* lineage during reconstruction. */
for (int i = 0; i < NUM_TASKS; ++i) {
@@ -180,7 +199,19 @@ TEST object_reconstruction_recursive_test(void) {
} else {
/* Run the event loop. NOTE: OSX appears to require the parent process to
* listen for events on the open file descriptors. */
event_loop_add_timer(photon->loop, 1000,
event_loop_add_timer(photon->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(photon->loop);
/* Set the final task's status to TASK_STATUS_DONE to prevent the race
* condition that would suppress object reconstruction. */
task *last_task = alloc_task(specs[NUM_TASKS - 1], TASK_STATUS_DONE,
get_db_client_id(photon->photon_state->db));
task_table_add_task(photon->photon_state->db, last_task,
(retry_info *) &photon_retry, NULL, NULL);
/* Trigger reconstruction, and run the event loop again. */
object_id return_id = task_return(specs[NUM_TASKS - 1], 0);
photon_reconstruct_object(photon->conn, return_id);
event_loop_add_timer(photon->loop, 500,
(event_loop_timer_handler) timeout_handler, NULL);
event_loop_run(photon->loop);
/* Wait for the child process to exit and check that there are no tasks
@@ -209,7 +240,7 @@ void object_reconstruction_suppression_callback(object_id object_id,
}
TEST object_reconstruction_suppression_test(void) {
photon_mock *photon = init_photon_mock();
photon_mock *photon = init_photon_mock(true);
object_reconstruction_suppression_spec = example_task_spec(0, 1);
object_id return_id = task_return(object_reconstruction_suppression_spec, 0);
pid_t pid = fork();
@@ -255,10 +286,54 @@ TEST object_reconstruction_suppression_test(void) {
}
}
TEST object_notifications_test(void) {
photon_mock *photon = init_photon_mock(false);
local_scheduler_state *state = photon->photon_state;
scheduling_algorithm_state *algorithm_state = state->algorithm_state;
int worker_index = 0;
task_spec *spec = example_task_spec(1, 1);
object_id oid = task_arg_id(spec, 0);
/* Check that the task gets queued if the task is submitted and a worker is
* available, but the input is not. Once the input is available, the task
* gets assigned. */
handle_task_submitted(state, algorithm_state, spec);
handle_worker_available(state, algorithm_state, worker_index);
ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1);
handle_object_available(state, algorithm_state, oid);
ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0);
reset_worker(photon, worker_index);
/* Check that the task gets queued if the task is submitted and the input is
* available, but no worker is available yet. Once a worker is available, the
* task gets assigned. */
handle_task_submitted(state, algorithm_state, spec);
ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1);
handle_worker_available(state, algorithm_state, worker_index);
ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0);
reset_worker(photon, worker_index);
/* If an object gets removed, check the first scenario again, where the task
* gets queued if the task is submitted and a worker is available, but the
* input is not. Once the input is made available again, the task gets
* assigned. */
handle_object_removed(state, oid);
handle_task_submitted(state, algorithm_state, spec);
handle_worker_available(state, algorithm_state, worker_index);
ASSERT_EQ(num_tasks_in_queue(algorithm_state), 1);
handle_object_available(state, algorithm_state, oid);
ASSERT_EQ(num_tasks_in_queue(algorithm_state), 0);
free_task_spec(spec);
destroy_photon_mock(photon);
PASS();
}
SUITE(photon_tests) {
RUN_REDIS_TEST(object_reconstruction_test);
RUN_REDIS_TEST(object_reconstruction_recursive_test);
RUN_REDIS_TEST(object_reconstruction_suppression_test);
RUN_TEST(object_notifications_test);
}
GREATEST_MAIN_DEFS();
+10
View File
@@ -261,4 +261,14 @@ event_loop *get_event_loop(plasma_manager_state *state);
*/
int get_client_sock(client_connection *conn);
/**
* Return whether or not the object is local.
*
* @param state The state of the plasma manager.
* @param object_id The ID of the object we want to find.
* @return A bool that is true if the requested object is local and false
* otherwise.
*/
bool is_object_local(plasma_manager_state *state, object_id object_id);
#endif /* PLASMA_MANAGER_H */
+45
View File
@@ -5,6 +5,7 @@
#include <poll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <fcntl.h>
#include "common.h"
#include "test/test_common.h"
@@ -244,11 +245,55 @@ TEST read_write_object_chunk_test(void) {
PASS();
}
TEST object_notifications_test(void) {
plasma_mock *local_mock = init_plasma_mock(NULL);
/* Open a non-blocking socket pair to mock the object notifications from the
* plasma store. */
int fd[2];
socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
int flags = fcntl(fd[1], F_GETFL, 0);
CHECK(fcntl(fd[1], F_SETFL, flags | O_NONBLOCK) == 0);
object_id oid = globally_unique_id();
object_info info = {.obj_id = oid,
.data_size = 10,
.metadata_size = 1,
.create_time = 0,
.construct_duration = 0,
.digest = {0},
.is_deletion = false};
/* Check that the object is not local at first. */
bool is_local = is_object_local(local_mock->state, oid);
ASSERT(!is_local);
/* Check that the object is local after receiving an object notification. */
send(fd[1], (char const *) &info, sizeof(info), 0);
process_object_notification(local_mock->loop, fd[0], local_mock->state, 0);
is_local = is_object_local(local_mock->state, oid);
ASSERT(is_local);
/* Check that the object is not local after receiving a notification about
* the object deletion. */
info.is_deletion = true;
send(fd[1], (char const *) &info, sizeof(info), 0);
process_object_notification(local_mock->loop, fd[0], local_mock->state, 0);
is_local = is_object_local(local_mock->state, oid);
ASSERT(!is_local);
/* Clean up. */
close(fd[0]);
close(fd[1]);
destroy_plasma_mock(local_mock);
PASS();
}
SUITE(plasma_manager_tests) {
memset(&oid, 1, sizeof(oid));
RUN_TEST(request_transfer_test);
RUN_TEST(request_transfer_retry_test);
RUN_TEST(read_write_object_chunk_test);
RUN_TEST(object_notifications_test);
}
GREATEST_MAIN_DEFS();