diff --git a/logging.c b/logging.c index 38abc7bf9..bf37f1ee5 100644 --- a/logging.c +++ b/logging.c @@ -17,7 +17,7 @@ struct ray_logger_impl { int log_level; /* Whether or not we have a direct connection to Redis. */ int is_direct; - /* Either a db_conn or a socket to a process with a db_conn, + /* Either a db_handle or a socket to a process with a db_handle, * depending on the is_direct flag. */ void *conn; }; @@ -57,7 +57,7 @@ void ray_log(ray_logger *logger, UT_string *origin_id; utstring_new(origin_id); if (logger->is_direct) { - db_conn *db = (db_conn *) logger->conn; + db_handle *db = (db_handle *) logger->conn; utstring_printf(origin_id, "%ld:%s", db->client_id, ""); redisAsyncCommand(db->context, NULL, NULL, log_fmt, utstring_body(timestamp), logger->client_type, diff --git a/state/db.h b/state/db.h index 3fcf658fd..e6ca089a0 100644 --- a/state/db.h +++ b/state/db.h @@ -3,22 +3,21 @@ #include "event_loop.h" -typedef struct db_conn_impl db_conn; +typedef struct db_handle_impl db_handle; -/* Connect to the global system store at address and port. The last - * parameter is an output parameter and we assume the memory is - * allocated by the caller. */ -void db_connect(const char *db_address, - int db_port, - const char *client_type, - const char *client_addr, - int client_port, - db_conn *db); +/* Connect to the global system store at address and port. Returns + * a handle to the database, which must be freed with db_disconnect + * after use. */ +db_handle *db_connect(const char *db_address, + int db_port, + const char *client_type, + const char *client_addr, + int client_port); /* Attach global system store connection to event loop. */ -void db_attach(db_conn *db, event_loop *loop); +void db_attach(db_handle *db, event_loop *loop); /* Disconnect from the global system store. */ -void db_disconnect(db_conn *db); +void db_disconnect(db_handle *db); #endif diff --git a/state/object_table.h b/state/object_table.h index 7c00ab2ba..e2eb89433 100644 --- a/state/object_table.h +++ b/state/object_table.h @@ -10,12 +10,14 @@ typedef void (*lookup_callback)(object_id object_id, /* Register a new object with the directory. */ /* TODO(pcm): Retry, print for each attempt. */ -void object_table_add(db_conn *db, object_id object_id); +void object_table_add(db_handle *db, object_id object_id); /* Remove object from the directory. */ -void object_table_remove(db_conn *db, object_id object_id, const char *manager); +void object_table_remove(db_handle *db, + object_id object_id, + const char *manager); /* Look up entry from the directory */ -void object_table_lookup(db_conn *db, +void object_table_lookup(db_handle *db, object_id object_id, lookup_callback callback); diff --git a/state/redis.c b/state/redis.c index d10b06b75..aa2011d9a 100644 --- a/state/redis.c +++ b/state/redis.c @@ -31,12 +31,12 @@ } \ } while (0); -void db_connect(const char *address, - int port, - const char *client_type, - const char *client_addr, - int client_port, - db_conn *db) { +db_handle *db_connect(const char *address, + int port, + const char *client_type, + const char *client_addr, + int client_port) { + db_handle *db = malloc(sizeof(db_handle)); /* Sync connection for initial handshake */ redisReply *reply; long long num_clients; @@ -75,9 +75,10 @@ void db_connect(const char *address, CHECK_REDIS_CONNECT(redisAsyncContext, db->context, "could not connect to redis %s:%d", address, port); db->context->data = (void *) db; + return db; } -void db_disconnect(db_conn *db) { +void db_disconnect(db_handle *db) { redisFree(db->sync_context); redisAsyncFree(db->context); service_cache_entry *e, *tmp; @@ -87,13 +88,14 @@ void db_disconnect(db_conn *db) { free(e); } free(db->client_type); + free(db); } -void db_attach(db_conn *db, event_loop *loop) { +void db_attach(db_handle *db, event_loop *loop) { redisAeAttach(loop, db->context); } -void object_table_add(db_conn *db, unique_id object_id) { +void object_table_add(db_handle *db, unique_id object_id) { static char hex_object_id[2 * UNIQUE_ID_SIZE + 1]; sha1_to_hex(&object_id.id[0], &hex_object_id[0]); redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%s %d", @@ -104,7 +106,7 @@ void object_table_add(db_conn *db, unique_id object_id) { } void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { - db_conn *db = c->data; + db_handle *db = c->data; lookup_callback_data *cb_data = privdata; redisReply *reply = r; if (reply == NULL) @@ -143,7 +145,7 @@ void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) { free(result); } -void object_table_lookup(db_conn *db, +void object_table_lookup(db_handle *db, object_id object_id, lookup_callback callback) { static char hex_object_id[2 * UNIQUE_ID_SIZE + 1]; @@ -158,7 +160,7 @@ void object_table_lookup(db_conn *db, } } -void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec *task) { +void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task) { /* For converting an id to hex, which has double the number * of bytes compared to the id (+ 1 byte for '\0'). */ static char hex[2 * UNIQUE_ID_SIZE + 1]; diff --git a/state/redis.h b/state/redis.h index c579e7065..23ebb2ba6 100644 --- a/state/redis.h +++ b/state/redis.h @@ -17,7 +17,7 @@ typedef struct { UT_hash_handle hh; } service_cache_entry; -struct db_conn_impl { +struct db_handle_impl { /* String that identifies this client type. */ char *client_type; /* Unique ID for this client within the type. */ diff --git a/state/task_queue.h b/state/task_queue.h index 0226c501b..92968707e 100644 --- a/state/task_queue.h +++ b/state/task_queue.h @@ -21,13 +21,13 @@ typedef unique_id node_id; typedef void (*task_queue_callback)(task_iid *task_iid, task_spec *task); /* Submit task to the global scheduler. */ -void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec *task); +void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task); /* Submit task to a local scheduler based on the decision made by the global * scheduler. */ -void task_queue_schedule_task(db_conn *db, task_iid task_iid, node_id node); +void task_queue_schedule_task(db_handle *db, task_iid task_iid, node_id node); /* Subscribe to task queue. */ -void task_queue_register_callback(db_conn *db, task_queue_callback callback); +void task_queue_register_callback(db_handle *db, task_queue_callback callback); #endif diff --git a/state/task_table.h b/state/task_table.h index 64285da67..71e879c2c 100644 --- a/state/task_table.h +++ b/state/task_table.h @@ -5,9 +5,9 @@ #include "task.h" /* Add task to the task table, handle errors here. */ -status task_table_add_task(db_conn *db, task_iid task_iid, task_spec *task); +status task_table_add_task(db_handle *db, task_iid task_iid, task_spec *task); /* Get specific task from the task table. */ -status task_table_get_task(db_conn *db, task_iid task_iid, task_spec *task); +status task_table_get_task(db_handle *db, task_iid task_iid, task_spec *task); #endif /* TASK_TABLE_H */ diff --git a/test/db_tests.c b/test/db_tests.c index d9dfcb563..99fad7e18 100644 --- a/test/db_tests.c +++ b/test/db_tests.c @@ -45,20 +45,18 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) { TEST object_table_lookup_test(void) { event_loop *loop = event_loop_create(); - db_conn conn1; - db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port1, - &conn1); - db_conn conn2; - db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port2, - &conn2); - db_attach(&conn1, loop); - db_attach(&conn2, loop); + db_handle *db1 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, + manager_port1); + db_handle *db2 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, + manager_port2); + db_attach(db1, loop); + db_attach(db2, loop); unique_id id = globally_unique_id(); - object_table_add(&conn1, id); - object_table_add(&conn2, id); + object_table_add(db1, id); + object_table_add(db2, id); event_loop_add_timer(loop, 100, timeout_handler, NULL); event_loop_run(loop); - object_table_lookup(&conn1, id, test_callback); + object_table_lookup(db1, id, test_callback); event_loop_add_timer(loop, 100, timeout_handler, NULL); event_loop_run(loop); int port1 = atoi(received_port1); @@ -67,8 +65,8 @@ TEST object_table_lookup_test(void) { ASSERT((port1 == manager_port1 && port2 == manager_port2) || (port2 == manager_port1 && port1 == manager_port2)); - db_disconnect(&conn1); - db_disconnect(&conn2); + db_disconnect(db1); + db_disconnect(db2); event_loop_destroy(loop); PASS(); @@ -76,17 +74,16 @@ TEST object_table_lookup_test(void) { TEST task_queue_test(void) { event_loop *loop = event_loop_create(); - db_conn conn; - db_connect("127.0.0.1", 6379, "local_scheduler", "", -1, &conn); - db_attach(&conn, loop); + db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1); + db_attach(db, loop); task_spec *task = example_task(); - task_queue_submit_task(&conn, globally_unique_id(), task); + task_queue_submit_task(db, globally_unique_id(), task); event_loop_add_timer(loop, 100, timeout_handler, NULL); event_loop_run(loop); free_task_spec(task); - db_disconnect(&conn); + db_disconnect(db); event_loop_destroy(loop); PASS(); } diff --git a/test/redis_tests.c b/test/redis_tests.c index d527d0476..b1cde98b6 100644 --- a/test/redis_tests.c +++ b/test/redis_tests.c @@ -68,10 +68,10 @@ TEST redis_socket_test(void) { } void redis_read_callback(event_loop *loop, int fd, void *context, int events) { - db_conn *conn = context; + db_handle *db = context; char *cmd = read_string(fd); - redisAsyncCommand(conn->context, async_redis_socket_test_callback, NULL, cmd, - conn->client_id, 0); + redisAsyncCommand(db->context, async_redis_socket_test_callback, NULL, cmd, + db->client_id, 0); free(cmd); } @@ -102,9 +102,8 @@ TEST async_redis_socket_test(void) { utarray_push_back(connections, &socket_fd); /* Start connection to Redis. */ - db_conn conn; - db_connect("127.0.0.1", 6379, "", "", 0, &conn); - db_attach(&conn, loop); + db_handle *db = db_connect("127.0.0.1", 6379, "", "", 0); + db_attach(db, loop); /* Send a command to the Redis process. */ int client_fd = connect_ipc_sock(socket_pathname); @@ -113,15 +112,15 @@ TEST async_redis_socket_test(void) { write_formatted_string(client_fd, test_set_format, test_key, test_value); event_loop_add_file(loop, client_fd, EVENT_LOOP_READ, redis_read_callback, - &conn); + db); event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, redis_accept_callback, - &conn); + db); event_loop_add_timer(loop, 100, timeout_handler, NULL); event_loop_run(loop); CHECK(async_redis_socket_test_callback_called); - db_disconnect(&conn); + db_disconnect(db); event_loop_destroy(loop); for (int *p = (int *) utarray_front(connections); p != NULL; p = (int *) utarray_next(connections, p)) { @@ -148,7 +147,7 @@ void logging_read_callback(event_loop *loop, int fd, void *context, int events) { - db_conn *conn = context; + db_handle *conn = context; char *cmd = read_string(fd); redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd, conn->client_id, 0); @@ -177,9 +176,8 @@ TEST logging_test(void) { utarray_push_back(connections, &socket_fd); /* Start connection to Redis. */ - db_conn conn; - db_connect("127.0.0.1", 6379, "", "", 0, &conn); - db_attach(&conn, loop); + db_handle *conn = db_connect("127.0.0.1", 6379, "", "", 0); + db_attach(conn, loop); /* Send a command to the Redis process. */ int client_fd = connect_ipc_sock(socket_pathname); @@ -189,16 +187,16 @@ TEST logging_test(void) { ray_log(logger, RAY_INFO, "TEST", "Message"); event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, logging_accept_callback, - &conn); + conn); event_loop_add_file(loop, client_fd, EVENT_LOOP_READ, logging_read_callback, - &conn); + conn); event_loop_add_timer(loop, 100, timeout_handler, NULL); event_loop_run(loop); CHECK(logging_test_callback_called); free_ray_logger(logger); - db_disconnect(&conn); + db_disconnect(conn); event_loop_destroy(loop); for (int *p = (int *) utarray_front(connections); p != NULL; p = (int *) utarray_next(connections, p)) {