Refactor state database (#22)

* make db_connect return the connection

* rename db_conn -> db_handle

* more renaming

* clang-format

* free the db_handle
This commit is contained in:
Philipp Moritz
2016-09-25 21:52:06 -07:00
committed by Robert Nishihara
parent 1e08629013
commit db8c0acc71
9 changed files with 67 additions and 69 deletions
+2 -2
View File
@@ -17,7 +17,7 @@ struct ray_logger_impl {
int log_level;
/* Whether or not we have a direct connection to Redis. */
int is_direct;
/* Either a db_conn or a socket to a process with a db_conn,
/* Either a db_handle or a socket to a process with a db_handle,
* depending on the is_direct flag. */
void *conn;
};
@@ -57,7 +57,7 @@ void ray_log(ray_logger *logger,
UT_string *origin_id;
utstring_new(origin_id);
if (logger->is_direct) {
db_conn *db = (db_conn *) logger->conn;
db_handle *db = (db_handle *) logger->conn;
utstring_printf(origin_id, "%ld:%s", db->client_id, "");
redisAsyncCommand(db->context, NULL, NULL, log_fmt,
utstring_body(timestamp), logger->client_type,
+11 -12
View File
@@ -3,22 +3,21 @@
#include "event_loop.h"
typedef struct db_conn_impl db_conn;
typedef struct db_handle_impl db_handle;
/* Connect to the global system store at address and port. The last
* parameter is an output parameter and we assume the memory is
* allocated by the caller. */
void db_connect(const char *db_address,
int db_port,
const char *client_type,
const char *client_addr,
int client_port,
db_conn *db);
/* Connect to the global system store at address and port. Returns
* a handle to the database, which must be freed with db_disconnect
* after use. */
db_handle *db_connect(const char *db_address,
int db_port,
const char *client_type,
const char *client_addr,
int client_port);
/* Attach global system store connection to event loop. */
void db_attach(db_conn *db, event_loop *loop);
void db_attach(db_handle *db, event_loop *loop);
/* Disconnect from the global system store. */
void db_disconnect(db_conn *db);
void db_disconnect(db_handle *db);
#endif
+5 -3
View File
@@ -10,12 +10,14 @@ typedef void (*lookup_callback)(object_id object_id,
/* Register a new object with the directory. */
/* TODO(pcm): Retry, print for each attempt. */
void object_table_add(db_conn *db, object_id object_id);
void object_table_add(db_handle *db, object_id object_id);
/* Remove object from the directory. */
void object_table_remove(db_conn *db, object_id object_id, const char *manager);
void object_table_remove(db_handle *db,
object_id object_id,
const char *manager);
/* Look up entry from the directory */
void object_table_lookup(db_conn *db,
void object_table_lookup(db_handle *db,
object_id object_id,
lookup_callback callback);
+14 -12
View File
@@ -31,12 +31,12 @@
} \
} while (0);
void db_connect(const char *address,
int port,
const char *client_type,
const char *client_addr,
int client_port,
db_conn *db) {
db_handle *db_connect(const char *address,
int port,
const char *client_type,
const char *client_addr,
int client_port) {
db_handle *db = malloc(sizeof(db_handle));
/* Sync connection for initial handshake */
redisReply *reply;
long long num_clients;
@@ -75,9 +75,10 @@ void db_connect(const char *address,
CHECK_REDIS_CONNECT(redisAsyncContext, db->context,
"could not connect to redis %s:%d", address, port);
db->context->data = (void *) db;
return db;
}
void db_disconnect(db_conn *db) {
void db_disconnect(db_handle *db) {
redisFree(db->sync_context);
redisAsyncFree(db->context);
service_cache_entry *e, *tmp;
@@ -87,13 +88,14 @@ void db_disconnect(db_conn *db) {
free(e);
}
free(db->client_type);
free(db);
}
void db_attach(db_conn *db, event_loop *loop) {
void db_attach(db_handle *db, event_loop *loop) {
redisAeAttach(loop, db->context);
}
void object_table_add(db_conn *db, unique_id object_id) {
void object_table_add(db_handle *db, unique_id object_id) {
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
sha1_to_hex(&object_id.id[0], &hex_object_id[0]);
redisAsyncCommand(db->context, NULL, NULL, "SADD obj:%s %d",
@@ -104,7 +106,7 @@ void object_table_add(db_conn *db, unique_id object_id) {
}
void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) {
db_conn *db = c->data;
db_handle *db = c->data;
lookup_callback_data *cb_data = privdata;
redisReply *reply = r;
if (reply == NULL)
@@ -143,7 +145,7 @@ void object_table_get_entry(redisAsyncContext *c, void *r, void *privdata) {
free(result);
}
void object_table_lookup(db_conn *db,
void object_table_lookup(db_handle *db,
object_id object_id,
lookup_callback callback) {
static char hex_object_id[2 * UNIQUE_ID_SIZE + 1];
@@ -158,7 +160,7 @@ void object_table_lookup(db_conn *db,
}
}
void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec *task) {
void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task) {
/* For converting an id to hex, which has double the number
* of bytes compared to the id (+ 1 byte for '\0'). */
static char hex[2 * UNIQUE_ID_SIZE + 1];
+1 -1
View File
@@ -17,7 +17,7 @@ typedef struct {
UT_hash_handle hh;
} service_cache_entry;
struct db_conn_impl {
struct db_handle_impl {
/* String that identifies this client type. */
char *client_type;
/* Unique ID for this client within the type. */
+3 -3
View File
@@ -21,13 +21,13 @@ typedef unique_id node_id;
typedef void (*task_queue_callback)(task_iid *task_iid, task_spec *task);
/* Submit task to the global scheduler. */
void task_queue_submit_task(db_conn *db, task_iid task_iid, task_spec *task);
void task_queue_submit_task(db_handle *db, task_iid task_iid, task_spec *task);
/* Submit task to a local scheduler based on the decision made by the global
* scheduler. */
void task_queue_schedule_task(db_conn *db, task_iid task_iid, node_id node);
void task_queue_schedule_task(db_handle *db, task_iid task_iid, node_id node);
/* Subscribe to task queue. */
void task_queue_register_callback(db_conn *db, task_queue_callback callback);
void task_queue_register_callback(db_handle *db, task_queue_callback callback);
#endif
+2 -2
View File
@@ -5,9 +5,9 @@
#include "task.h"
/* Add task to the task table, handle errors here. */
status task_table_add_task(db_conn *db, task_iid task_iid, task_spec *task);
status task_table_add_task(db_handle *db, task_iid task_iid, task_spec *task);
/* Get specific task from the task table. */
status task_table_get_task(db_conn *db, task_iid task_iid, task_spec *task);
status task_table_get_task(db_handle *db, task_iid task_iid, task_spec *task);
#endif /* TASK_TABLE_H */
+15 -18
View File
@@ -45,20 +45,18 @@ int64_t timeout_handler(event_loop *loop, int64_t id, void *context) {
TEST object_table_lookup_test(void) {
event_loop *loop = event_loop_create();
db_conn conn1;
db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port1,
&conn1);
db_conn conn2;
db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr, manager_port2,
&conn2);
db_attach(&conn1, loop);
db_attach(&conn2, loop);
db_handle *db1 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr,
manager_port1);
db_handle *db2 = db_connect("127.0.0.1", 6379, "plasma_manager", manager_addr,
manager_port2);
db_attach(db1, loop);
db_attach(db2, loop);
unique_id id = globally_unique_id();
object_table_add(&conn1, id);
object_table_add(&conn2, id);
object_table_add(db1, id);
object_table_add(db2, id);
event_loop_add_timer(loop, 100, timeout_handler, NULL);
event_loop_run(loop);
object_table_lookup(&conn1, id, test_callback);
object_table_lookup(db1, id, test_callback);
event_loop_add_timer(loop, 100, timeout_handler, NULL);
event_loop_run(loop);
int port1 = atoi(received_port1);
@@ -67,8 +65,8 @@ TEST object_table_lookup_test(void) {
ASSERT((port1 == manager_port1 && port2 == manager_port2) ||
(port2 == manager_port1 && port1 == manager_port2));
db_disconnect(&conn1);
db_disconnect(&conn2);
db_disconnect(db1);
db_disconnect(db2);
event_loop_destroy(loop);
PASS();
@@ -76,17 +74,16 @@ TEST object_table_lookup_test(void) {
TEST task_queue_test(void) {
event_loop *loop = event_loop_create();
db_conn conn;
db_connect("127.0.0.1", 6379, "local_scheduler", "", -1, &conn);
db_attach(&conn, loop);
db_handle *db = db_connect("127.0.0.1", 6379, "local_scheduler", "", -1);
db_attach(db, loop);
task_spec *task = example_task();
task_queue_submit_task(&conn, globally_unique_id(), task);
task_queue_submit_task(db, globally_unique_id(), task);
event_loop_add_timer(loop, 100, timeout_handler, NULL);
event_loop_run(loop);
free_task_spec(task);
db_disconnect(&conn);
db_disconnect(db);
event_loop_destroy(loop);
PASS();
}
+14 -16
View File
@@ -68,10 +68,10 @@ TEST redis_socket_test(void) {
}
void redis_read_callback(event_loop *loop, int fd, void *context, int events) {
db_conn *conn = context;
db_handle *db = context;
char *cmd = read_string(fd);
redisAsyncCommand(conn->context, async_redis_socket_test_callback, NULL, cmd,
conn->client_id, 0);
redisAsyncCommand(db->context, async_redis_socket_test_callback, NULL, cmd,
db->client_id, 0);
free(cmd);
}
@@ -102,9 +102,8 @@ TEST async_redis_socket_test(void) {
utarray_push_back(connections, &socket_fd);
/* Start connection to Redis. */
db_conn conn;
db_connect("127.0.0.1", 6379, "", "", 0, &conn);
db_attach(&conn, loop);
db_handle *db = db_connect("127.0.0.1", 6379, "", "", 0);
db_attach(db, loop);
/* Send a command to the Redis process. */
int client_fd = connect_ipc_sock(socket_pathname);
@@ -113,15 +112,15 @@ TEST async_redis_socket_test(void) {
write_formatted_string(client_fd, test_set_format, test_key, test_value);
event_loop_add_file(loop, client_fd, EVENT_LOOP_READ, redis_read_callback,
&conn);
db);
event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, redis_accept_callback,
&conn);
db);
event_loop_add_timer(loop, 100, timeout_handler, NULL);
event_loop_run(loop);
CHECK(async_redis_socket_test_callback_called);
db_disconnect(&conn);
db_disconnect(db);
event_loop_destroy(loop);
for (int *p = (int *) utarray_front(connections); p != NULL;
p = (int *) utarray_next(connections, p)) {
@@ -148,7 +147,7 @@ void logging_read_callback(event_loop *loop,
int fd,
void *context,
int events) {
db_conn *conn = context;
db_handle *conn = context;
char *cmd = read_string(fd);
redisAsyncCommand(conn->context, logging_test_callback, NULL, cmd,
conn->client_id, 0);
@@ -177,9 +176,8 @@ TEST logging_test(void) {
utarray_push_back(connections, &socket_fd);
/* Start connection to Redis. */
db_conn conn;
db_connect("127.0.0.1", 6379, "", "", 0, &conn);
db_attach(&conn, loop);
db_handle *conn = db_connect("127.0.0.1", 6379, "", "", 0);
db_attach(conn, loop);
/* Send a command to the Redis process. */
int client_fd = connect_ipc_sock(socket_pathname);
@@ -189,16 +187,16 @@ TEST logging_test(void) {
ray_log(logger, RAY_INFO, "TEST", "Message");
event_loop_add_file(loop, socket_fd, EVENT_LOOP_READ, logging_accept_callback,
&conn);
conn);
event_loop_add_file(loop, client_fd, EVENT_LOOP_READ, logging_read_callback,
&conn);
conn);
event_loop_add_timer(loop, 100, timeout_handler, NULL);
event_loop_run(loop);
CHECK(logging_test_callback_called);
free_ray_logger(logger);
db_disconnect(&conn);
db_disconnect(conn);
event_loop_destroy(loop);
for (int *p = (int *) utarray_front(connections); p != NULL;
p = (int *) utarray_next(connections, p)) {