From f89be9699c8ba8d3c519d5d15a52ec79bc0c1f00 Mon Sep 17 00:00:00 2001 From: Ion Date: Thu, 1 Dec 2016 02:15:21 -0800 Subject: [PATCH] Introduce non-blocking Plasma API. (#71) * Implement new plasma client API. * Formatting fixes. * Make tests work again. * Make tests run. * Comment style. * Fix bugs with fetch tests. * Introduce fetch1 flag. * Remove timer only if present. * Formatting fixes. * Don't access object after free. * Formatting fixes. * Minor change. * refactoring plasma datastructures * Change plasma_request and plasma_reply to use only arrays of object requests. * some more fixes * Remove unnecessary methods. * Trivial. * fixes * use plasma_send_reply in return_from_wait1 * Lint. --- src/common/common.h | 3 + src/common/io.c | 2 +- src/common/state/object_table.c | 3 + src/common/state/redis.c | 2 +- src/plasma/Makefile | 6 +- src/plasma/plasma.c | 15 +- src/plasma/plasma.h | 108 +++- src/plasma/plasma_client.c | 412 ++++++++++++- src/plasma/plasma_client.h | 271 ++++++++- src/plasma/plasma_manager.c | 895 +++++++++++++++++++++++----- src/plasma/plasma_manager.h | 69 ++- src/plasma/plasma_store.c | 84 ++- src/plasma/plasma_store.h | 22 +- src/plasma/test/client_tests.c | 379 ++++++++++++ src/plasma/test/manager_tests.c | 6 +- src/plasma/test/run_client_tests.sh | 17 + 16 files changed, 2053 insertions(+), 241 deletions(-) create mode 100644 src/plasma/test/client_tests.c create mode 100755 src/plasma/test/run_client_tests.sh diff --git a/src/common/common.h b/src/common/common.h index 80bb10582..47fb088ce 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -149,4 +149,7 @@ typedef unique_id db_client_id; */ bool db_client_ids_equal(db_client_id first_id, db_client_id second_id); +#define MAX(x, y) ((x) >= (y) ? (x) : (y)) +#define MIN(x, y) ((x) <= (y) ? (x) : (y)) + #endif diff --git a/src/common/io.c b/src/common/io.c index 073712e02..5ec162544 100644 --- a/src/common/io.c +++ b/src/common/io.c @@ -250,7 +250,7 @@ int read_bytes(int fd, uint8_t *cursor, size_t length) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; } - return -1; + return -1; /* Errno will be set. */ } else if (0 == nbytes) { /* Encountered early EOF. */ return -1; diff --git a/src/common/state/object_table.c b/src/common/state/object_table.c index 48a9db0ff..108011745 100644 --- a/src/common/state/object_table.c +++ b/src/common/state/object_table.c @@ -6,6 +6,7 @@ void object_table_lookup(db_handle *db_handle, retry_info *retry, object_table_lookup_done_callback done_callback, void *user_context) { + CHECK(db_handle != NULL); init_table_callback(db_handle, object_id, __func__, NULL, retry, done_callback, redis_object_table_lookup, user_context); } @@ -15,6 +16,7 @@ void object_table_add(db_handle *db_handle, retry_info *retry, object_table_done_callback done_callback, void *user_context) { + CHECK(db_handle != NULL); init_table_callback(db_handle, object_id, __func__, NULL, retry, done_callback, redis_object_table_add, user_context); } @@ -27,6 +29,7 @@ void object_table_subscribe( retry_info *retry, object_table_done_callback done_callback, void *user_context) { + CHECK(db_handle != NULL); object_table_subscribe_data *sub_data = malloc(sizeof(object_table_subscribe_data)); utarray_push_back(db_handle->callback_freelist, &sub_data); diff --git a/src/common/state/redis.c b/src/common/state/redis.c index 47822f86f..3e59ff26c 100644 --- a/src/common/state/redis.c +++ b/src/common/state/redis.c @@ -402,7 +402,7 @@ void redis_object_table_subscribe_lookup(redisAsyncContext *c, if (reply->elements > 0) { CHECK(reply->element[0]->len == UNIQUE_ID_SIZE); /* Check that the reply corresponds to the right object ID. */ - CHECK(strncmp(reply->element[0]->str, callback_data->id.id, + CHECK(strncmp(reply->element[0]->str, (char *) callback_data->id.id, UNIQUE_ID_SIZE)); object_table_subscribe_data *data = callback_data->data; if (data->object_available_callback) { diff --git a/src/plasma/Makefile b/src/plasma/Makefile index 53f36b516..11a116495 100644 --- a/src/plasma/Makefile +++ b/src/plasma/Makefile @@ -16,6 +16,9 @@ clean: $(BUILD)/manager_tests: test/manager_tests.c plasma.h plasma.c plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common $(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/manager_tests.c plasma.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a +$(BUILD)/client_tests: test/client_tests.c plasma.h plasma.c plasma_client.h plasma_client.c plasma_manager.h plasma_manager.c fling.h fling.c common + $(CC) $(CFLAGS) $(TEST_CFLAGS) -o $@ test/client_tests.c plasma.c plasma_manager.c plasma_client.c fling.c ../common/build/libcommon.a ../common/thirdparty/hiredis/libhiredis.a + $(BUILD)/plasma_store: plasma_store.c plasma.h plasma.c eviction_policy.c fling.h fling.c malloc.c malloc.h thirdparty/dlmalloc.c common $(CC) $(CFLAGS) plasma_store.c plasma.c eviction_policy.c fling.c malloc.c ../common/build/libcommon.a -o $(BUILD)/plasma_store @@ -37,8 +40,9 @@ common: FORCE # Set the request timeout low and logging level at FATAL for testing purposes. test: CFLAGS += -DRAY_TIMEOUT=50 -DRAY_COMMON_LOG_LEVEL=4 # First, build and run all the unit tests. -test: $(BUILD)/manager_tests FORCE +test: $(BUILD)/manager_tests $(BUILD)/client_tests FORCE ./build/manager_tests + ./test/run_client_tests.sh cd ../common; make redis # Next, build all the executables for Python testing. test: all diff --git a/src/plasma/plasma.c b/src/plasma/plasma.c index ac9f09188..b6ad44768 100644 --- a/src/plasma/plasma.c +++ b/src/plasma/plasma.c @@ -9,18 +9,16 @@ plasma_request plasma_make_request(object_id object_id) { plasma_request request; memset(&request, 0, sizeof(request)); request.num_object_ids = 1; - request.object_ids[0] = object_id; + request.object_requests[0].object_id = object_id; return request; } -plasma_request *plasma_alloc_request(int num_object_ids, - object_id object_ids[]) { +plasma_request *plasma_alloc_request(int num_object_ids) { DCHECK(num_object_ids >= 1); int req_size = plasma_request_size(num_object_ids); plasma_request *req = malloc(req_size); memset(req, 0, req_size); req->num_object_ids = num_object_ids; - memcpy(&req->object_ids, object_ids, num_object_ids * sizeof(object_ids[0])); return req; } @@ -29,7 +27,7 @@ void plasma_free_request(plasma_request *request) { } int64_t plasma_request_size(int num_object_ids) { - int64_t object_ids_size = (num_object_ids - 1) * sizeof(object_id); + int64_t object_ids_size = (num_object_ids - 1) * sizeof(object_request); return sizeof(plasma_request) + object_ids_size; } @@ -37,7 +35,7 @@ plasma_reply plasma_make_reply(object_id object_id) { plasma_reply reply; memset(&reply, 0, sizeof(reply)); reply.num_object_ids = 1; - reply.object_ids[0] = object_id; + reply.object_requests[0].object_id = object_id; return reply; } @@ -56,7 +54,7 @@ void plasma_free_reply(plasma_reply *reply) { int64_t plasma_reply_size(int num_object_ids) { DCHECK(num_object_ids >= 1); - return sizeof(plasma_reply) + (num_object_ids - 1) * sizeof(object_id); + return sizeof(plasma_reply) + (num_object_ids - 1) * sizeof(object_request); } int plasma_send_reply(int sock, plasma_reply *reply) { @@ -82,7 +80,8 @@ int plasma_receive_request(int sock, int64_t *type, plasma_request **request) { if (*request == NULL) { return *type == DISCONNECT_CLIENT; } - return length == plasma_request_size((*request)->num_object_ids) ? 0 : -1; + int req_size = plasma_request_size((*request)->num_object_ids); + return length == req_size ? 0 : -1; } bool plasma_object_ids_distinct(int num_object_ids, object_id object_ids[]) { diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index 58336fd12..25586a71f 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include /* pid_t */ @@ -21,11 +22,34 @@ typedef struct { int64_t construct_duration; } plasma_object_info; -/* Handle to access memory mapped file and map it into client address space */ +/** + * Object request data structure. Used in the plasma_wait_for_objects() + * argument. + */ typedef struct { - /** The file descriptor of the memory mapped file in the store. It is used - * as a unique identifier of the file in the client to look up the - * corresponding file descriptor on the client's side. */ + /** The ID of the requested object. If ID_NIL request any object. */ + object_id object_id; + /** Request associated to the object. It can take one of the following values: + * - PLASMA_QUERY_LOCAL: return if or when the object is available in the + * local Plasma Store. + * - PLASMA_QUERY_ANYWHERE: return if or when the object is available in + * the system (i.e., either in the local or a remote Plasma Store). */ + int type; + /** Object status. Same as the status returned by plasma_status() function + * call. This is filled in by plasma_wait_for_objects1(): + * - PLASMA_OBJECT_LOCAL: object is ready at the local Plasma Store. + * - PLASMA_OBJECT_REMOTE: object is ready at a remote Plasma Store. + * - PLASMA_OBJECT_NONEXISTENT: object does not exist in the system. + * - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled + * for being transferred or it is transferring. */ + int status; +} object_request; + +/* Handle to access memory mapped file and map it into client address space. */ +typedef struct { + /** The file descriptor of the memory mapped file in the store. It is used as + * a unique identifier of the file in the client to look up the corresponding + * file descriptor on the client's side. */ int store_fd; /** The size in bytes of the memory mapped file. */ int64_t mmap_size; @@ -44,15 +68,47 @@ typedef struct { int64_t metadata_size; } plasma_object; -enum object_status { OBJECT_NOT_FOUND = 0, OBJECT_FOUND = 1 }; +typedef enum { + /** Object was created but not sealed in the local Plasma Store. */ + PLASMA_CREATED = 1, + /** Object is sealed and stored in the local Plasma Store. */ + PLASMA_SEALED +} object_state; -typedef enum { OPEN, SEALED } object_state; +typedef enum { + /** The object was not found. */ + OBJECT_NOT_FOUND = 0, + /** The object was found. */ + OBJECT_FOUND = 1 +} object_status; + +typedef enum { + /** Object is stored in the local Plasma Store. */ + PLASMA_OBJECT_LOCAL = 1, + /** Object is stored on a remote Plasma store, and it is not stored on the + * local Plasma Store. */ + PLASMA_OBJECT_REMOTE, + /** Object is currently transferred from a remote Plasma store the the local + * Plasma Store. */ + PLASMA_OBJECT_IN_TRANSFER, + /** Object is not stored in the system. */ + PLASMA_OBJECT_NONEXISTENT +} object_status1; + +typedef enum { + /** Query for object in the local plasma store. */ + PLASMA_QUERY_LOCAL = 1, + /** Query for object in the local plasma store or in a remote plasma store. */ + PLASMA_QUERY_ANYWHERE +} object_request_type; enum plasma_message_type { /** Create a new object. */ PLASMA_CREATE = 128, /** Get an object. */ PLASMA_GET, + /** Get an object stored at the local Plasma Store. */ + PLASMA_GET_LOCAL, /** Tell the store that the client no longer needs an object. */ PLASMA_RELEASE, /** Check if an object is present. */ @@ -69,10 +125,18 @@ enum plasma_message_type { PLASMA_TRANSFER, /** Header for sending data. */ PLASMA_DATA, - /** Request a fetch of an object in another store. */ + /** Request a fetch of an object in another store. Non-blocking call. */ + PLASMA_FETCH_REMOTE, + /** Request a fetch of an object in another store. Blocking call. */ PLASMA_FETCH, + /** Request status of an object, i.e., whether the object is stored in the + * local Plasma Store, in a remote Plasma Store, in transfer, or doesn't + * exist in the system. */ + PLASMA_STATUS, /** Wait until an object becomes available. */ - PLASMA_WAIT + PLASMA_WAIT, + /** Wait until an object becomes available. */ + PLASMA_WAIT1 }; typedef struct { @@ -82,8 +146,8 @@ typedef struct { int64_t metadata_size; /** The timeout of the request. */ uint64_t timeout; - /** The number of objects we wait for for wait. */ - int num_returns; + /** The number of objects we are waiting for to be ready. */ + int num_ready_objects; /** In a transfer request, this is the IP address of the Plasma Manager to * transfer the object to. */ uint8_t addr[4]; @@ -94,13 +158,22 @@ typedef struct { int64_t num_bytes; /** The number of object IDs that will be included in this request. */ int num_object_ids; - /** The IDs of the objects that the request is about. */ - object_id object_ids[1]; + /** The object requests that the request is about. */ + object_request object_requests[1]; } plasma_request; +typedef enum { + /** There is no error. */ + PLASMA_REPLY_OK = 1, + /** The object already exists. */ + PLASMA_OBJECT_ALREADY_EXISTS +} plasma_error; + typedef struct { /** The object that is returned with this reply. */ plasma_object object; + /** TODO: document this. */ + int object_status; /** This is used only to respond to requests of type * PLASMA_CONTAINS or PLASMA_FETCH. It is 1 if the object is * present and 0 otherwise. Used for plasma_contains and @@ -112,8 +185,10 @@ typedef struct { int num_objects_returned; /** The number of object IDs that will be included in this reply. */ int num_object_ids; - /** The IDs of the objects that this reply refers to. */ - object_id object_ids[1]; + /** The object requests that this reply refers to. */ + object_request object_requests[1]; + /** Return error code. */ + plasma_error error_code; } plasma_reply; /** This type is used by the Plasma store. It is here because it is exposed to @@ -158,12 +233,9 @@ plasma_request plasma_make_request(object_id object_id); * must free the returned plasma request pointer with plasma_free_request. * * @param num_object_ids The number of object IDs to include in the request. - * @param object_ids The array of object IDs to include in the request. It must - * have length at least equal to num_object_ids. * @return A pointer to the newly created plasma request. */ -plasma_request *plasma_alloc_request(int num_object_ids, - object_id object_ids[]); +plasma_request *plasma_alloc_request(int num_object_ids); /** * Free a plasma request. diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index a441351c0..53bd3074c 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -10,12 +10,15 @@ #include #include #include +#include #include #include #include #include #include +#include #include +#include #include "common.h" #include "io.h" @@ -78,6 +81,10 @@ struct plasma_connection { int store_conn; /** File descriptor of the Unix domain socket that connects to the manager. */ int manager_conn; + /** File descriptor of the Unix domain socket on which client receives event + * notifications for the objects it subscribes for when these objects are + * sealed either locally or remotely. */ + int manager_conn_subscribe; /** Table of dlmalloc buffer files that have been memory mapped so far. This * is a hash table mapping a file descriptor to a struct containing the * address of the corresponding memory-mapped file. */ @@ -85,10 +92,10 @@ struct plasma_connection { /** A hash table of the object IDs that are currently being used by this * client. */ object_in_use_entry *objects_in_use; - /** Object IDs of the last few release calls. This is used to delay - * releasing objects to see if they can be reused by subsequent tasks so we - * do not unneccessarily invalidate cpu caches. TODO(pcm): replace this with - * a proper lru cache of size sizeof(L3 cache). */ + /** Object IDs of the last few release calls. This is used to delay releasing + * objects to see if they can be reused by subsequent tasks so we do not + * unneccessarily invalidate cpu caches. TODO(pcm): replace this with a + * proper lru cache of size sizeof(L3 cache). */ UT_ringbuffer *release_history; /** Configuration options for the plasma client. */ plasma_client_config config; @@ -157,15 +164,14 @@ void increment_object_count(plasma_connection *conn, object_entry->count += 1; } -void plasma_create(plasma_connection *conn, +bool plasma_create(plasma_connection *conn, object_id object_id, int64_t data_size, uint8_t *metadata, int64_t metadata_size, uint8_t **data) { LOG_DEBUG("called plasma_create on conn %d with size %" PRId64 - " and metadata size " - "%" PRId64, + " and metadata size %" PRId64, conn->store_conn, data_size, metadata_size); plasma_request req = plasma_make_request(object_id); req.data_size = data_size; @@ -174,7 +180,11 @@ void plasma_create(plasma_connection *conn, plasma_reply reply; CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0); int fd = recv_fd(conn->store_conn); - CHECK(fd >= 0); + CHECKM(fd >= 0, "recv not successful"); + if (reply.error_code == PLASMA_OBJECT_ALREADY_EXISTS) { + LOG_DEBUG("returned from plasma_create with error %d", reply.error_code); + return false; + } plasma_object *object = &reply.object; CHECK(object->data_size == data_size); CHECK(object->metadata_size == metadata_size); @@ -194,6 +204,7 @@ void plasma_create(plasma_connection *conn, * client is using. A call to plasma_release is required to decrement this * count. */ increment_object_count(conn, object_id, object->handle.store_fd); + return true; } /* This method is used to get both the data and the metadata. */ @@ -461,30 +472,24 @@ void plasma_fetch(plasma_connection *conn, /* Make sure that there are no duplicated object IDs. TODO(rkn): we should * allow this case in the future. */ CHECK(plasma_object_ids_distinct(num_object_ids, object_ids)); - plasma_request *req = plasma_alloc_request(num_object_ids, object_ids); + plasma_request *req = plasma_alloc_request(num_object_ids); + for (int i = 0; i < num_object_ids; ++i) { + req->object_requests[i].object_id = object_ids[i]; + } LOG_DEBUG("Requesting fetch"); CHECK(plasma_send_request(conn->manager_conn, PLASMA_FETCH, req) >= 0); free(req); plasma_reply reply; - int nbytes, success; + int success; for (int received = 0; received < num_object_ids; ++received) { - nbytes = recv(conn->manager_conn, (uint8_t *) &reply, sizeof(reply), - MSG_WAITALL); - if (nbytes < 0) { - LOG_ERROR("Error while waiting for manager response in fetch"); - success = 0; - } else if (nbytes == 0) { - success = 0; - } else { - CHECK(nbytes == sizeof(reply)); - success = reply.has_object; - } + CHECK(plasma_receive_reply(conn->manager_conn, sizeof(reply), &reply) >= 0); CHECK(reply.num_object_ids == 1); + success = reply.has_object; /* Update the correct index in is_fetched. */ int i = 0; - for (; i < num_object_ids; i++) { - if (object_ids_equal(object_ids[i], reply.object_ids[0]) && + for (; i < num_object_ids; ++i) { + if (object_ids_equal(object_ids[i], reply.object_requests[0].object_id) && !is_fetched[i]) { is_fetched[i] = success; break; @@ -503,15 +508,20 @@ int plasma_wait(plasma_connection *conn, int num_returns, object_id return_object_ids[]) { CHECK(conn->manager_conn >= 0); - plasma_request *req = plasma_alloc_request(num_object_ids, object_ids); - req->num_returns = num_returns; + plasma_request *req = plasma_alloc_request(num_object_ids); + for (int i = 0; i < num_object_ids; ++i) { + req->object_requests[i].object_id = object_ids[i]; + } + req->num_ready_objects = num_returns; req->timeout = timeout; CHECK(plasma_send_request(conn->manager_conn, PLASMA_WAIT, req) >= 0); plasma_free_request(req); int64_t return_size = plasma_reply_size(num_returns); plasma_reply *reply = malloc(return_size); CHECK(plasma_receive_reply(conn->manager_conn, return_size, reply) >= 0); - memcpy(return_object_ids, reply->object_ids, num_returns * sizeof(object_id)); + for (int i = 0; i < num_returns; ++i) { + return_object_ids[i] = reply->object_requests[i].object_id; + } int num_objects_returned = reply->num_objects_returned; free(reply); return num_objects_returned; @@ -520,3 +530,353 @@ int plasma_wait(plasma_connection *conn, int get_manager_fd(plasma_connection *conn) { return conn->manager_conn; } + +/** === ALTERNATE PLASMA CLIENT API === + + * This API simplifies the previous one in two ways. First if factors out + * object (re)construction from the Plasma Manager. Second, except for + * plasma_wait_for_objects() all other functions are non-blocking. + * + * TODO: + * - plasma_info() not implemented yet, but not needed at this point. + * - assume new implementation of object_table_subscribe() which returns + * if object is in the Local Store (check with jpm). + * - need to phase out old API and drope *1 from the names of the functions + * once the old ones are dropped. + */ + +bool plasma_get_local(plasma_connection *conn, + object_id object_id, + object_buffer *object_buffer) { + CHECK(conn != NULL); + + plasma_request req = plasma_make_request(object_id); + CHECK(plasma_send_request(conn->store_conn, PLASMA_GET_LOCAL, &req) >= 0); + + plasma_reply reply; + CHECK(plasma_receive_reply(conn->store_conn, sizeof(reply), &reply) >= 0); + int fd = recv_fd(conn->store_conn); + CHECKM(fd >= 0, "recv_fd not successful"); + + if (reply.has_object) { + plasma_object *object = &reply.object; + object_buffer->data = lookup_or_mmap(conn, fd, object->handle.store_fd, + object->handle.mmap_size) + + object->data_offset; + object_buffer->data_size = object->data_size; + object_buffer->metadata = object_buffer->data + object->data_size; + object_buffer->metadata_size = object->metadata_size; + + /* Increment the count of the number of instances of this object that this + * client is using. A call to plasma_release is required to decrement this + * count. */ + increment_object_count(conn, object_id, object->handle.store_fd); + return true; + } + /* The object is either (1) not available in the local Plasma store, or (2) it + * is not sealed yet. */ + return false; +} + +int plasma_fetch_remote(plasma_connection *conn, object_id object_id) { + CHECK(conn != NULL); + CHECK(conn->manager_conn >= 0); + + plasma_request req = plasma_make_request(object_id); + CHECK(plasma_send_request(conn->manager_conn, PLASMA_FETCH_REMOTE, &req) >= + 0); + + plasma_reply reply; + CHECK(plasma_receive_reply(conn->manager_conn, sizeof(reply), &reply) >= 0); + + return reply.object_status; +} + +int plasma_status(plasma_connection *conn, object_id object_id) { + CHECK(conn != NULL); + CHECK(conn->manager_conn >= 0); + + plasma_request req = plasma_make_request(object_id); + CHECK(plasma_send_request(conn->manager_conn, PLASMA_STATUS, &req) >= 0); + + plasma_reply reply; + CHECK(plasma_receive_reply(conn->manager_conn, sizeof(reply), &reply) >= 0); + + return reply.object_status; +} + +int plasma_wait_for_objects(plasma_connection *conn, + int num_object_requests, + object_request object_requests[], + int num_ready_objects, + uint64_t timeout_ms) { + CHECK(conn != NULL); + CHECK(conn->manager_conn >= 0); + CHECK(num_object_requests > 0); + + plasma_request *req = plasma_alloc_request(num_object_requests); + for (int i = 0; i < num_object_requests; ++i) { + req->object_requests[i] = object_requests[i]; + } + req->num_ready_objects = num_ready_objects; + req->timeout = timeout_ms; + CHECK(plasma_send_request(conn->manager_conn, PLASMA_WAIT1, req) >= 0); + free(req); + + plasma_reply *reply = plasma_alloc_reply(num_object_requests); + CHECK(plasma_receive_reply(conn->manager_conn, + plasma_reply_size(num_object_requests), + reply) >= 0); + int num_objects_ready = 0; + for (int i = 0; i < num_object_requests; ++i) { + int type, status; + object_requests[i].object_id = reply->object_requests[i].object_id; + type = reply->object_requests[i].type; + object_requests[i].type = type; + status = reply->object_requests[i].status; + object_requests[i].status = status; + + if (type == PLASMA_QUERY_LOCAL) { + if (status == PLASMA_OBJECT_LOCAL) { + num_objects_ready += 1; + } + } else { + CHECK(type == PLASMA_QUERY_ANYWHERE); + if (status == PLASMA_OBJECT_LOCAL || status == PLASMA_OBJECT_REMOTE) { + num_objects_ready += 1; + } + } + } + free(reply); + return num_objects_ready; +} + +/* + * TODO: maybe move the plasma_client_* functions in another file. + * + * plasma_client_* represent functions implemented by client; so probably + * need to be in a different file. + */ + +void plasma_client_get(plasma_connection *conn, + object_id object_id, + object_buffer *object_buffer) { + CHECK(conn != NULL); + CHECK(conn->manager_conn >= 0); + + object_request request; + request.object_id = object_id; + + while (true) { + if (plasma_get_local(conn, object_id, object_buffer)) { + /* Object is in the local Plasma Store, and it is sealed. */ + return; + } + + switch (plasma_fetch_remote(conn, object_id)) { + case PLASMA_OBJECT_LOCAL: + /* Object has finished being transfered just after calling + * plasma_get_local(), and it is now in the local Plasma Store. Loop again + * to call plasma_get_local() and eventually return. */ + continue; + case PLASMA_OBJECT_REMOTE: + /* A fetch request has been already scheduled for object_id, so wait for + * it to complete. */ + request.type = PLASMA_QUERY_LOCAL; + break; + case PLASMA_OBJECT_NONEXISTENT: + /* Object doesn’t exist in the system so ask local scheduler to create it. + */ + /* TODO: scheduler_create_object(object_id); */ + /* Wait for the object to be (re)constructed and sealed either in the + * local Plasma Store or remotely. */ + request.type = PLASMA_QUERY_ANYWHERE; + break; + default: + CHECKM(0, "Unrecognizable object status.") + } + +/* + * Wait for object_id to (1) be transferred and sealed in the local + * Plasma Store, if available remotely, or (2) be (re)constructued either + * locally or remotely, if object_id didn't exist in the system. + * - if timeout, next iteration will retry plasma_fetch() or + * scheduler_create_object() + * - if request.status == PLASMA_OBJECT_LOCAL, next iteration + * will get object and return + * - if request.status == PLASMA_OBJECT_REMOTE, next iteration + * will call plasma_fetch() + * - if request.status == PLASMA_OBJECT_NONEXISTENT, next iteration + * will call scheduler_create_object() + */ +#define TIMEOUT_WAIT_MS 200 + plasma_wait_for_objects(conn, 1, &request, 1, TIMEOUT_WAIT_MS); + } +} + +int plasma_client_wait(plasma_connection *conn, + int num_object_ids, + object_id object_ids[], + uint64_t timeout, + int num_returns, + object_id return_object_ids[]) { + CHECK(conn->manager_conn >= 0); + CHECK(num_object_ids >= num_returns); + + object_request requests[num_object_ids]; + + /* Initialize array of object requests. We only care for the objects to be + * present in the system, not necessary in the local Plasma Store. Thus, we + * set the request type to PLASMA_QUERY_ANYWHERE. */ + for (int i = 0; i < num_object_ids; ++i) { + requests[i].object_id = object_ids[i]; + requests[i].type = PLASMA_QUERY_ANYWHERE; + } + + /* Loop until we get num_returns objects stored in the system either in the + * local Plasma Store or remotely. */ + uint64_t remaining_timeout = timeout; + while (true) { + struct timeval start, end; + gettimeofday(&start, NULL); + + int n = plasma_wait_for_objects(conn, num_object_ids, requests, num_returns, + MIN(remaining_timeout, TIMEOUT_WAIT_MS)); + + gettimeofday(&end, NULL); + float diff_ms = (end.tv_sec - start.tv_sec); + diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; + remaining_timeout = + (remaining_timeout >= diff_ms ? remaining_timeout - diff_ms : 0); + + if (n >= num_returns || remaining_timeout == 0) { + /* Either (1) num_returns requests are satisfied or (2) timeout expired. + * In both cases we return. */ + int idx_returns = 0; + + for (int i = 0; i < num_returns; ++i) { + if (requests[i].status == PLASMA_OBJECT_LOCAL || + requests[i].status == PLASMA_OBJECT_REMOTE) { + return_object_ids[idx_returns] = requests[i].object_id; + idx_returns += 1; + } + } + return idx_returns; + } + /* The timeout hasn't expired and we got less than num_returns in the + * system. Trigger reconstruction of the missing objects. */ + for (int i = 0; i < num_returns; ++i) { + if (requests[i].status == PLASMA_OBJECT_NONEXISTENT) { + /* Object doesn’t exist in the system so ask local scheduler to create + * object with ID requests[i].object_id. */ + /* TODO: scheduler_create_object(object_id); */ + printf("XXX Need to schedule object -- not implemented yet!\n"); + /* Subscribe to hear back when object_id is sealed. */ + } + } + } +} + +void plasma_client_multiget(plasma_connection *conn, + int num_object_ids, + object_id object_ids[], + object_buffer object_buffers[]) { + object_request requests[num_object_ids]; + + /* Set all request types to PLASMA_OBJECT_LOCAL, as we want to get all objects + * into the local Plasma Store. */ + for (int i = 0; i < num_object_ids; ++i) { + requests[i].object_id = object_ids[i]; + requests[i].type = PLASMA_QUERY_LOCAL; + } + + while (true) { + int n; + + /* Wait to get all objects in the system. The reason we call + * plasma_wait_for_objects() here instead of iterating over + * plasma_client_get() is to increase concurrency as plasma_client_get() is + * blocking. */ + n = plasma_wait_for_objects(conn, num_object_ids, requests, num_object_ids, + TIMEOUT_WAIT_MS); + + if (n == num_object_ids) { + /* All objects are in the system either on the local or a remote Plasma + * store, so we are done. */ + break; + } + + for (int i = 0; i < num_object_ids; ++i) { + if (requests[i].status == PLASMA_OBJECT_REMOTE) { + plasma_fetch_remote(conn, requests[i].object_id); + } else { + if (requests[i].status == PLASMA_OBJECT_NONEXISTENT) { + /* Object doesn’t exist so ask local scheduler to create it. */ + /* TODO: scheduler_create_object(requests[i].object_id); */ + printf("XXX Need to schedule object -- not implemented yet!\n"); + } + } + } + } + + /* Now get the data for every object. */ + for (int i = 0; i < num_object_ids; ++i) { + plasma_client_get(conn, object_ids[i], &object_buffers[i]); + } +} + +/** + * TODO: maybe move object_requests_* functions in another file. + * The object_request data structure is defined in plasma.h since + * it is used by plasma_request and plasma_reply, but there is no + * plasma.c file. + */ +void object_requests_copy(int num_object_requests, + object_request object_requests_dst[], + object_request object_requests_src[]) { + for (int i = 0; i < num_object_requests; ++i) { + object_requests_dst[i].object_id = object_requests_src[i].object_id; + object_requests_dst[i].type = object_requests_src[i].type; + object_requests_dst[i].status = object_requests_src[i].type; + } +} + +object_request *object_requests_get_object(object_id object_id, + int num_object_requests, + object_request object_requests[]) { + for (int i = 0; i < num_object_requests; ++i) { + if (object_ids_equal(object_requests[i].object_id, object_id)) { + return &object_requests[i]; + } + } + return NULL; +} + +void object_requests_set_status_all(int num_object_requests, + object_request object_requests[], + int status) { + for (int i = 0; i < num_object_requests; ++i) { + object_requests[i].status = status; + } +} + +void object_id_print(object_id obj_id) { + for (int i = 0; i < sizeof(object_id); ++i) { + printf("%u.", obj_id.id[i]); + if (i < sizeof(object_id) - 1) { + printf("."); + } + } +} + +void object_requests_print(int num_object_requests, + object_request object_requests[]) { + for (int i = 0; i < num_object_requests; ++i) { + printf("["); + for (int j = 0; j < sizeof(object_id); ++j) { + object_id_print(object_requests[i].object_id); + } + printf(" | %d | %d], ", object_requests[i].type, object_requests[i].status); + } + printf("\n"); +} diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index f28770615..65dc2826d 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -2,6 +2,7 @@ #define PLASMA_CLIENT_H #include +#include #include "plasma.h" @@ -77,9 +78,10 @@ int plasma_manager_connect(const char *addr, int port); * @param metadata_size The size in bytes of the metadata. If there is no metadata, this should be 0. * @param data The address of the newly created object will be written here. - * @return Void. + * @return True, if object was created, false, otherwise (e.g., if object has + * been already created). */ -void plasma_create(plasma_connection *conn, +bool plasma_create(plasma_connection *conn, object_id object_id, int64_t size, uint8_t *metadata, @@ -245,4 +247,267 @@ int plasma_subscribe(plasma_connection *conn); */ int get_manager_fd(plasma_connection *conn); -#endif +/* === ALTERNATE PLASMA CLIENT API === */ + +/** + * Object buffer data structure. + */ +typedef struct { + /** The size in bytes of the data object. */ + int64_t data_size; + /** The address of the data object. */ + uint8_t *data; + /** The metadata size in bytes. */ + int64_t metadata_size; + /** The address of the metadata. */ + uint8_t *metadata; +} object_buffer; + +/** + * Object information data structure. + */ +typedef struct { + /** The time when the object was created (sealed). */ + time_t last_access_time; + /** The time when the object was last accessed. */ + time_t creation_date; + uint64_t refcount; +} object_info; + +/** + * Get specified object from the local Plasma Store. This function is + * non-blocking. + * + * @param conn The object containing the connection state. + * @param object_id The ID of the object to get. + * @param object_buffer The data structure where the object information will + * be written, including object payload and metadata. + * @return True if the object is returned and false otherwise. + */ +bool plasma_get_local(plasma_connection *conn, + object_id object_id, + object_buffer *object_buffer); + +/** + * Initiates the fetch (transfer) of an object from a remote Plasma Store. + * + * If the object is stored in the local Plasma Store, tell the caller. + * + * If not, check whether the object is stored on a remote Plasma Store. If yes, + * and if a transfer for the object has either been scheduled or is in progress, + * then return. Otherwise schedule a transfer for the object. + * + * If the object is not available locally or remotely, the client has to tell + * local scheduler to (re)create the object. + * + * This function is non-blocking. + * + * @param conn The object containing the connection state. + * @param object_id The ID of the object we want to transfer. + * @return Status as returned by the get_status() function. Status can take the + * following values. + * - PLASMA_CLIENT_LOCAL, if the object is stored in the local Plasma + * Store. + * - PLASMA_CLIENT_TRANSFER, if the object is either currently being + * transferred or the transfer has been scheduled. + * - PLASMA_CLIENT_REMOTE, if the object is stored at a remote Plasma + * Store. + * - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the + * system. + */ +int plasma_fetch_remote(plasma_connection *conn, object_id object_id); + +/** + * Return the status of a given object. This function is similar to + * plasma_fetch_remote() with the only difference that plamsa_fetch_remote() + * also schedules the obejct transfer, if not local. + * + * @param conn The object containing the connection state. + * @param object_id The ID of the object whose status we query. + * @return Status as returned by get_status() function. Status can take the + * following values. + * - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma Store. + * has been already scheduled by the Plasma Manager. + * - PLASMA_CLIENT_TRANSFER, if the object is either currently being + * transferred or just scheduled. + * - PLASMA_CLIENT_REMOTE, if the object is stored at a remote + * Plasma Store. + * - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the + * system. + */ +int plasma_status(plasma_connection *conn, object_id object_id); + +/** + * Return the information associated to a given object. + * + * @param conn The object containing the connection state. + * @param object_id The ID of the object whose info the client queries. + * @param object_info The object's infirmation. + * @return PLASMA_CLIENT_LOCAL, if the object is in the local Plasma Store. + * PLASMA_CLIENT_NOT_LOCAL, if not. In this case, the caller needs to + * ignore data, metadata_size, and metadata fields. + */ +int plasma_info(plasma_connection *conn, + object_id object_id, + object_info *object_info); + +/** + * Wait for (1) a specified number of objects to be available (sealed) in the + * local Plasma Store or in a remote Plasma Store, or (2) for a timeout to + * expire. This is a blocking call. + * + * @param conn The object containing the connection state. + * @param num_object_requests Size of the object_requests array. + * @param object_requests Object event array. Each element contains a request + * for a particular object_id. The type of request is specified in the + * "type" field. + * - A PLASMA_OBJECT_LOCAL request is satisfied when object_id becomes + * available in the local Plasma Store. In this case, this function + * sets the "status" field to PLASMA_OBJECT_LOCAL. + * - A PLASMA_OBJECT_ANYWHERE request is satisfied when object_id becomes + * available either at the local Plasma Store or on a remote Plasma + * Store. In this case, the functions sets the "status" field to + * PLASMA_OBJECT_LOCAL or PLASMA_OBJECT_REMOTE. + * @param num_ready_objects The number of requests in object_requests array that + * must be satisfied before the function returns, unless it timeouts. + * The num_ready_objects should be no larger than num_object_requests. + * @param timeout_ms Timeout value in milliseconds. If this timeout expires + * before min_num_ready_objects of requests are satisfied, the function + * returns. + * @return Number of satisfied requests in the object_requests list. If the + * returned number is less than min_num_ready_objects this means that + * timeout expired. + */ +int plasma_wait_for_objects(plasma_connection *conn, + int num_object_requests, + object_request object_requests[], + int num_ready_objects, + uint64_t timeout_ms); + +/** + * TODO: maybe move the plasma_client_* functions in another file. + * + * plasma_client_* represent functions implemented by client; so probably + * need to be in a different file. + */ + +/** + * Get an object from the Plasma Store. This function will block until the + * object has been created and sealed in the Plasma Store. + * + * @param conn The object containing the connection state. + * @param object_id The ID of the object to get. + * @param object_buffer The data structure where the object information will be + * written, including object payload and metadata. + * @return Void. + */ +void plasma_client_get(plasma_connection *conn, + object_id object_id, + object_buffer *object_buffer); + +/** + * Wait for objects to be created (right now, wait for local objects). + * + * @param conn The object containing the connection state. + * @param num_object_ids Number of object IDs wait is called on. + * @param object_ids Object IDs wait is called on. + * @param timeout Wait will time out and return after this number of ms. + * @param num_returns Number of object IDs wait will return if it doesn't time + * out. + * @param return_object_ids Out parameter for the object IDs returned by wait. + * This is an array of size num_returns. If the number of objects that + * are ready when we time out, the objects will be stored in the last + * slots of the array and the number of objects is returned. + * @return Number of objects that are actually ready. + */ +int plasma_client_wait(plasma_connection *conn, + int num_object_ids, + object_id object_ids[], + uint64_t timeout, + int num_returns, + object_id return_object_ids[]); + +/** + * Get an array of objects from the Plasma Store. This function will block until + * all object in the array have been created and sealed in the Plasma Store. + * + * @param conn The object containing the connection state. + * @param num_object_ids The number of objects in the array to be returned. + * @param object_ids The array of object IDs to be returned. + * @param object_buffers The array of data structure where the information of + * the return objects will be stored. The objects appear + * in the same order as their IDs in the object_ids array, + * @return Void. + */ +void plasma_client_multiget(plasma_connection *conn, + int num_object_ids, + object_id object_ids[], + object_buffer object_buffers[]); + +/** + * TODO: maybe move object_requests_* functions in another file. + * The object_request data structure is defined in plasma.h since + * it is used by plasma_request and plasma_reply, but there is no + * plasma.c file. + */ + +/** + * Copy an array of object requests into another one. + * + * @param num_object_requests Number of elements in the object_requests arrays. + * @param object_requests_dst Destination object_requests array. + * @param object_requests_dst Source object_requests array. + * @return None. + */ +void object_requests_copy(int num_object_requests, + object_request object_requests_dst[], + object_request object_requests_src[]); + +/** + * Given an object ID, get the corresponding object request + * form an array of object requests. + * + * @param object_id Identifier of the requested object. + * @param num_object_requests Number of elements in the object requests array. + * @param object_requests The array of object requests which + * contains the object (object_id). + * @return Object request, if found; NULL, if not found. + */ +object_request *object_requests_get_object(object_id object_id, + int num_object_requests, + object_request object_requests[]); + +/** + * Initialize status of all object requests in an array. + * + * @param num_object_requests Number of elements in the array of object + * requests. + * @param object_requests Array of object requests. + * @param status Value with which we initialize the status of each object + * request in the array. + * @return Void. + */ +void object_requests_set_status_all(int num_object_requests, + object_request object_requests[], + int status); + +/** + * Print an object ID with bytes separated by ".". + * + * @param object_id Object ID to be printed. + * @return Void. + */ +void object_id_print(object_id object_id); + +/** + * Print all object requests in an array (for debugging purposes). + * + * @param num_object_requests Number of elements in the array of object + * requests. + * @param object_requests Array of object requests. + * @return Void. + */ +void object_requests_print(int num_object_requests, + object_request object_requests[]); + +#endif /* PLASMA_CLIENT_H */ diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 4e2c5a7f7..b388a0114 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -36,9 +36,103 @@ #include "state/db.h" #include "state/object_table.h" -typedef struct client_object_connection client_object_connection; +void wait_object_lookup_callback(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context); -/* Entry of the hashtable of objects that are available locally. */ +/** + * Process either the fetch or the status request. + * + * @param client_conn Client connection. + * @param object_id ID of the object for which we process this request. + * @param fetc If true we process a fetch request, and if false + * we process a status request. + * @return Void. + */ +void process_fetch_or_status_request(client_connection *client_conn, + object_id object_id, + bool fetch); +/** + * Request a transfer for the given object ID from the next manager believed to + * have a copy. Adds the request for this object ID to the queue of outgoing + * requests to the manager we want to try. + * + * requst_trasnfer_from() will lead to thic Plasma manager, call it S, + * (1) sending a PLASMA_TRANFER request for object_id to the other + * end-point, R, of the client_conn. R is a remote Plasma manager + * which is expected to store object_id. + * (2) upon receiving this request, R will invoke process_transfer_request() + * which will send a PLASMA_DATA request containing object_id back to + * S. + * (3) Upen receiving the PLASMA_DATA request, S, will invoke + * process_data_request() (via process_data_chunk()) to read object_id. + * Note that all requests that are exchanged between S and R are via FIFO + * queues. + * + * @param client_conn The context for the connection to this client. + * @param object_id The object ID we want to request a transfer of. + * @returns Void. + */ +void request_transfer_from(client_connection *client_conn, object_id object_id); + +/** + * Request the transfer from a remote node or get the status of + * a given object. This is called for an object that is stored at + * a remote Plasma Store. + * + * @param object_id ID of the object to transfer or to get its status. + * @param manager_cont Number of remote nodes object_id is stored at. + * @param manager_vector Array containing the Plasma Managers + * running at the nodes where object_id is stored. + * @param context Client connection. + * @param fetch If true, this was triggered by a fetc operation. If not. + * we request its status. + * @return Status of object_id as defined in plasma.h + */ +int request_fetch_or_status(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context, + bool fetch); + +/** + * Send requested object_id back to the Plasma Manager identified + * by (addr, port) which requested it. This is done via a + * PLASMA_DATA message. + * + * @param loop + * @param object_id The ID of the object being transferred to (addr, port). + * @param addr The address of the Plasma Manager object_id is sent to. + * @param port The port number of the Plasma Manager object_id is sent to. + * @param conn The client connection object. + */ +void process_transfer_request(event_loop *loop, + object_id object_id, + uint8_t addr[4], + int port, + client_connection *conn); + +/** + * Receive object_id requested by this Plamsa Manager from the remote Plasma + * Manager identified by client_sock. The object_id is sent via the PLASMA_DATA + * message. + * + * @param loop The event data structure. + * @param client_sock The sender's socket. + * @param object_id ID of the object being received. + * @param data_size Size of the data of object_id. + * @param metadata_size Size of the metadata of object_id. + * @param conn The connection object. + */ +void process_data_request(event_loop *loop, + int client_sock, + object_id object_id, + int64_t data_size, + int64_t metadata_size, + client_connection *conn); + +/** Entry of the hashtable of objects that are available locally. */ typedef struct { /** Object id of this object. */ object_id object_id; @@ -63,7 +157,7 @@ struct plasma_manager_state { /** Hash table of outstanding fetch requests. The key is * object id, value is a list of connections to the clients * who are blocking on a fetch of this object. */ - client_object_connection *fetch_connections; + client_object_request *fetch_requests; /** Initialize an empty hash map for the cache of local available object. */ available_object *local_available_objects; }; @@ -71,11 +165,11 @@ struct plasma_manager_state { plasma_manager_state *g_manager_state = NULL; /* The context for fetch and wait requests. These are per client, per object. */ -struct client_object_connection { +struct client_object_request { /** The ID of the object we are fetching or waiting for. */ object_id object_id; /** The client connection context, shared between other - * client_object_connections for the same client. */ + * client_object_requests for the same client. */ client_connection *client_conn; /** The ID for the timer that will time out the current request to the state * database or another plasma manager. */ @@ -84,7 +178,7 @@ struct client_object_connection { * timeout. */ int num_retries; /** Handle for a linked list. */ - client_object_connection *next; + client_object_request *next; /** Pointer to the array containing the manager locations of * this object. */ char **manager_vector; @@ -115,6 +209,8 @@ struct client_connection { * list of buffers to write. */ /* TODO(swang): Split into two queues, data transfers and data requests. */ plasma_request_buffer *transfer_queue; + /** Buffer used to receive transfers (data fetches) we want to ignore */ + plasma_request_buffer *ignore_buffer; /** File descriptor for the socket connected to the other * plasma manager. */ int fd; @@ -122,12 +218,16 @@ struct client_connection { int64_t timer_id; /** True if this client is in a "wait" and false if it is in a "fetch". */ bool is_wait; + /** True if we use new version of wait. */ + bool wait1; + /** True if we use the new version of fetch. */ + bool fetch1; /** If this client is processing a wait, this contains the object ids that * are already available. */ plasma_reply *wait_reply; /** The objects that we are waiting for and their callback * contexts, for either a fetch or a wait operation. */ - client_object_connection *active_objects; + client_object_request *active_objects; /** The number of objects that we have left to return for * this fetch or wait operation. */ int num_return_objects; @@ -139,12 +239,12 @@ struct client_connection { UT_hash_handle manager_hh; }; -void free_client_object_connection(client_object_connection *object_conn) { - for (int i = 0; i < object_conn->manager_count; ++i) { - free(object_conn->manager_vector[i]); +void free_client_object_request(client_object_request *object_req) { + for (int i = 0; i < object_req->manager_count; ++i) { + free(object_req->manager_vector[i]); } - free(object_conn->manager_vector); - free(object_conn); + free(object_req->manager_vector); + free(object_req); } void send_client_reply(client_connection *conn, plasma_reply *reply) { @@ -168,72 +268,83 @@ void send_client_failure_reply(object_id object_id, client_connection *conn) { * @return A pointer to the active object context, or NULL if * there isn't one. */ -client_object_connection *get_object_connection(client_connection *client_conn, - object_id object_id) { - client_object_connection *object_conn; +client_object_request *get_object_request(client_connection *client_conn, + object_id object_id) { + client_object_request *object_req; HASH_FIND(active_hh, client_conn->active_objects, &object_id, - sizeof(object_id), object_conn); - return object_conn; + sizeof(object_id), object_req); + return object_req; } -client_object_connection *add_object_connection(client_connection *client_conn, - object_id object_id) { +client_object_request *add_object_request(client_connection *client_conn, + object_id object_id) { + CHECK(client_conn); /* Create a new context for this client connection and object. */ - client_object_connection *object_conn = - malloc(sizeof(client_object_connection)); - if (!object_conn) { - return NULL; - } - object_conn->object_id = object_id; - object_conn->client_conn = client_conn; - object_conn->manager_count = 0; - object_conn->manager_vector = NULL; - object_conn->next_manager = 0; + client_object_request *object_req = malloc(sizeof(client_object_request)); + CHECK(object_req); + object_req->object_id = object_id; + object_req->client_conn = client_conn; + /* The timer ID returned by event_loop_add_timer is positive, so we can check + * if the timer is -1 to see if a timer has been added. */ + object_req->timer = -1; + object_req->manager_count = 0; + object_req->manager_vector = NULL; + object_req->next_manager = 0; /* Register the object context with the client context. */ - client_object_connection *temp_object_conn = NULL; + client_object_request *temp_object_conn = NULL; HASH_FIND(active_hh, client_conn->active_objects, &object_id, sizeof(object_id), temp_object_conn); CHECKM(temp_object_conn == NULL, "The hash table already has an object connection for this object ID."); HASH_ADD(active_hh, client_conn->active_objects, object_id, sizeof(object_id), - object_conn); + object_req); /* Register the object context with the manager state. */ - client_object_connection *fetch_connections; - HASH_FIND(fetch_hh, client_conn->manager_state->fetch_connections, &object_id, - sizeof(object_id), fetch_connections); + client_object_request *fetch_requests; + HASH_FIND(fetch_hh, client_conn->manager_state->fetch_requests, &object_id, + sizeof(object_id), fetch_requests); LOG_DEBUG("Registering fd %d for fetch.", client_conn->fd); - if (!fetch_connections) { - fetch_connections = NULL; - LL_APPEND(fetch_connections, object_conn); - HASH_ADD(fetch_hh, client_conn->manager_state->fetch_connections, object_id, - sizeof(object_id), fetch_connections); + if (!fetch_requests) { + fetch_requests = NULL; + LL_APPEND(fetch_requests, object_req); + HASH_ADD(fetch_hh, client_conn->manager_state->fetch_requests, object_id, + sizeof(object_id), fetch_requests); } else { - LL_APPEND(fetch_connections, object_conn); + LL_APPEND(fetch_requests, object_req); } - return object_conn; + return object_req; } -void remove_object_connection(client_connection *client_conn, - client_object_connection *object_conn) { +void remove_object_request(client_connection *client_conn, + client_object_request *object_req) { /* Deregister the object context with the client context. */ /* TODO(rkn): Check that object_conn is actually in the hash table. */ - HASH_DELETE(active_hh, client_conn->active_objects, object_conn); + HASH_DELETE(active_hh, client_conn->active_objects, object_req); /* Deregister the object context with the manager state. */ - client_object_connection *object_conns; - HASH_FIND(fetch_hh, client_conn->manager_state->fetch_connections, - &(object_conn->object_id), sizeof(object_conn->object_id), - object_conns); - CHECK(object_conns); + client_object_request *object_reqs; + HASH_FIND(fetch_hh, client_conn->manager_state->fetch_requests, + &(object_req->object_id), sizeof(object_req->object_id), + object_reqs); + CHECK(object_reqs); int len; - client_object_connection *tmp; - LL_COUNT(object_conns, tmp, len); + client_object_request *tmp; + LL_COUNT(object_reqs, tmp, len); if (len == 1) { - HASH_DELETE(fetch_hh, client_conn->manager_state->fetch_connections, - object_conns); + HASH_DELETE(fetch_hh, client_conn->manager_state->fetch_requests, + object_reqs); + } + LL_DELETE(object_reqs, object_req); + /* remove_object_request() is not always called from the request's timer + * handle, so we remove the request's timer explicitly here. If + * remove_object_request() is called from the the request's timer handle, the + * code will still work correctly. While the timer handle returning + * EVENT_LOOP_TIMER_DONE will trigger another call for removing the request's + * timer, that's ok as event_loop_remove_timer() is idempotent. */ + if (object_req->timer != -1) { + event_loop_remove_timer(client_conn->manager_state->loop, + object_req->timer); } - LL_DELETE(object_conns, object_conn); /* Free the object. */ - free_client_object_connection(object_conn); + free_client_object_request(object_req); } plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, @@ -246,7 +357,7 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, state->plasma_conn = plasma_connect(store_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY); state->manager_connections = NULL; - state->fetch_connections = NULL; + state->fetch_requests = NULL; if (db_addr) { state->db = db_connect(db_addr, db_port, "plasma_manager", manager_addr, manager_port); @@ -283,11 +394,11 @@ void destroy_plasma_manager_state(plasma_manager_state *state) { free(manager_conn); } - if (state->fetch_connections != NULL) { + if (state->fetch_requests != NULL) { LOG_DEBUG("There were outstanding fetch requests."); - client_object_connection *object_conn, *tmp; - HASH_ITER(fetch_hh, state->fetch_connections, object_conn, tmp) { - remove_object_connection(object_conn->client_conn, object_conn); + client_object_request *object_req, *tmp; + HASH_ITER(fetch_hh, state->fetch_requests, object_req, tmp) { + remove_object_request(object_req->client_conn, object_req); } } @@ -439,6 +550,31 @@ void process_data_chunk(event_loop *loop, event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn); } +void ignore_data_chunk(event_loop *loop, + int data_sock, + void *context, + int events) { + /* Read the object chunk. */ + client_connection *conn = (client_connection *) context; + plasma_request_buffer *buf = conn->ignore_buffer; + + /* Just read the transferred data into ignore_buf and then drop (free) it. */ + int done = read_object_chunk(conn, buf); + if (!done) { + return; + } + + free(buf->data); + if (buf->metadata) { + free(buf->metadata); + } + free(buf); + /* Switch to listening for requests from this socket, instead of reading + * object data. */ + event_loop_remove_file(loop, data_sock); + event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn); +} + client_connection *get_manager_connection(plasma_manager_state *state, const char *ip_addr, int port) { @@ -512,6 +648,19 @@ void process_transfer_request(event_loop *loop, LL_APPEND(manager_conn->transfer_queue, buf); } +/** + * Receive object_id requested by this Plamsa Manager from the remote Plasma + * Manager identified by client_sock. The object_id is sent via the PLASMA_DATA + * message. + * + * @param loop The event data structure. + * @param client_sock The sender's socket. + * @param object_id ID of the object being received. + * @param data_size Size of the data of object_id. + * @param metadata_size Size of the metadata of object_id. + * @param conn The connection object. + * @return Void. + */ void process_data_request(event_loop *loop, int client_sock, object_id object_id, @@ -525,46 +674,61 @@ void process_data_request(event_loop *loop, /* The corresponding call to plasma_release should happen in * process_data_chunk. */ - plasma_create(conn->manager_state->plasma_conn, object_id, data_size, NULL, - metadata_size, &(buf->data)); - LL_APPEND(conn->transfer_queue, buf); + bool success_create = + plasma_create(conn->manager_state->plasma_conn, object_id, data_size, + NULL, metadata_size, &(buf->data)); + /* If success_create == true, a new object has been created. + * If success_create == false the object creation has failed, possibly + * due to an object with the same ID already existing in the Plasma Store. */ + if (success_create) { + /* Add buffer where the fetched data is to be stored to + * conn->transfer_queue. */ + LL_APPEND(conn->transfer_queue, buf); + } CHECK(conn->cursor == 0); /* Switch to reading the data from this socket, instead of listening for * other requests. */ event_loop_remove_file(loop, client_sock); - event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk, - conn); + if (success_create) { + event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk, + conn); + } else { + /* Since plasma_create() has failed, we ignore the data transfer. We will + * receive this transfer in g_ignore_buf and then drop it. Allocate memory + * for data and metadata, if needed. All memory associated with + * buf/g_ignore_buf will be freed in ignore_data_chunkc(). */ + conn->ignore_buffer = buf; + buf->data = (uint8_t *) malloc(buf->data_size); + if (buf->metadata_size > 0) { + buf->metadata = (uint8_t *) malloc(buf->metadata_size); + } else { + buf->metadata = NULL; + } + event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, ignore_data_chunk, + conn); + } } -/** - * Request a transfer for the given object ID from the next manager believed to - * have a copy. Adds the request for this object ID to the queue of outgoing - * requests to the manager we want to try. - * - * @param client_conn The context for the connection to this client. - * @param object_id The object ID we want to request a transfer of. - * @returns Void. - */ void request_transfer_from(client_connection *client_conn, object_id object_id) { - client_object_connection *object_conn = - get_object_connection(client_conn, object_id); - CHECK(object_conn); - CHECK(object_conn->manager_count > 0); - CHECK(object_conn->next_manager >= 0 && - object_conn->next_manager < object_conn->manager_count); + client_object_request *object_req = + get_object_request(client_conn, object_id); + CHECK(object_req); + CHECK(object_req->manager_count > 0); + CHECK(object_req->next_manager >= 0 && + object_req->next_manager < object_req->manager_count); char addr[16]; int port; - parse_ip_addr_port(object_conn->manager_vector[object_conn->next_manager], - addr, &port); + parse_ip_addr_port(object_req->manager_vector[object_req->next_manager], addr, + &port); client_connection *manager_conn = get_manager_connection(client_conn->manager_state, addr, port); plasma_request_buffer *transfer_request = malloc(sizeof(plasma_request_buffer)); transfer_request->type = PLASMA_TRANSFER; - transfer_request->object_id = object_conn->object_id; + transfer_request->object_id = object_req->object_id; if (manager_conn->transfer_queue == NULL) { /* If we already have a connection to this manager and its inactive, @@ -575,23 +739,23 @@ void request_transfer_from(client_connection *client_conn, /* Add this transfer request to this connection's transfer queue. */ LL_APPEND(manager_conn->transfer_queue, transfer_request); /* On the next attempt, try the next manager in manager_vector. */ - ++object_conn->next_manager; - object_conn->next_manager %= object_conn->manager_count; + object_req->next_manager += 1; + object_req->next_manager %= object_req->manager_count; } int manager_timeout_handler(event_loop *loop, timer_id id, void *context) { - client_object_connection *object_conn = context; - client_connection *client_conn = object_conn->client_conn; - LOG_DEBUG("Timer went off, %d tries left", object_conn->num_retries); - if (object_conn->num_retries > 0) { - request_transfer_from(client_conn, object_conn->object_id); - object_conn->num_retries--; + client_object_request *object_req = context; + client_connection *client_conn = object_req->client_conn; + LOG_DEBUG("Timer went off, %d tries left", object_req->num_retries); + if (object_req->num_retries > 0) { + request_transfer_from(client_conn, object_req->object_id); + object_req->num_retries--; return MANAGER_TIMEOUT; } - plasma_reply reply = plasma_make_reply(object_conn->object_id); + plasma_reply reply = plasma_make_reply(object_req->object_id); reply.has_object = 0; send_client_reply(client_conn, &reply); - remove_object_connection(client_conn, object_conn); + remove_object_request(client_conn, object_req); return EVENT_LOOP_TIMER_DONE; } @@ -603,11 +767,11 @@ void request_transfer(object_id object_id, const char *manager_vector[], void *context) { client_connection *client_conn = (client_connection *) context; - client_object_connection *object_conn = - get_object_connection(client_conn, object_id); + client_object_request *object_req = + get_object_request(client_conn, object_id); /* If there's already an outstanding fetch for this object for this client, * let the outstanding request finish the work. */ - if (object_conn) { + if (object_req) { return; } /* If the object isn't on any managers, report a failure to the client. */ @@ -616,38 +780,48 @@ void request_transfer(object_id object_id, /* TODO(swang): Instead of immediately counting this as a failure, maybe * register a Redis callback for changes to this object table entry. */ free(manager_vector); - send_client_failure_reply(object_id, client_conn); + plasma_reply reply = plasma_make_reply(object_id); + reply.object_status = PLASMA_OBJECT_NONEXISTENT; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); return; } /* Register the new outstanding fetch with the current client connection. */ - object_conn = add_object_connection(client_conn, object_id); - if (!object_conn) { + object_req = add_object_request(client_conn, object_id); + if (!object_req) { LOG_DEBUG("Unable to allocate memory for object context."); send_client_failure_reply(object_id, client_conn); } /* Pick a different manager to request a transfer from on every attempt. */ - object_conn->manager_count = manager_count; - object_conn->manager_vector = malloc(manager_count * sizeof(char *)); - memset(object_conn->manager_vector, 0, manager_count * sizeof(char *)); + object_req->manager_count = manager_count; + object_req->manager_vector = malloc(manager_count * sizeof(char *)); + memset(object_req->manager_vector, 0, manager_count * sizeof(char *)); for (int i = 0; i < manager_count; ++i) { int len = strlen(manager_vector[i]); - object_conn->manager_vector[i] = malloc(len + 1); - strncpy(object_conn->manager_vector[i], manager_vector[i], len); - object_conn->manager_vector[i][len] = '\0'; + object_req->manager_vector[i] = malloc(len + 1); + strncpy(object_req->manager_vector[i], manager_vector[i], len); + object_req->manager_vector[i][len] = '\0'; } free(manager_vector); /* Wait for the object data for the default number of retries, which timeout * after a default interval. */ - object_conn->num_retries = NUM_RETRIES; - object_conn->timer = + object_req->num_retries = NUM_RETRIES; + object_req->timer = event_loop_add_timer(client_conn->manager_state->loop, MANAGER_TIMEOUT, - manager_timeout_handler, object_conn); + manager_timeout_handler, object_req); request_transfer_from(client_conn, object_id); } +bool is_object_local(plasma_manager_state *state, object_id object_id) { + available_object *entry; + HASH_FIND(hh, state->local_available_objects, &object_id, sizeof(object_id), + entry); + return entry != NULL; +} + void process_fetch_request(client_connection *client_conn, object_id object_id) { client_conn->is_wait = false; + client_conn->fetch1 = false; client_conn->wait_reply = NULL; plasma_reply reply = plasma_make_reply(object_id); if (client_conn->manager_state->db == NULL) { @@ -656,10 +830,7 @@ void process_fetch_request(client_connection *client_conn, return; } /* Return success immediately if we already have this object. */ - int is_local = 0; - plasma_contains(client_conn->manager_state->plasma_conn, object_id, - &is_local); - if (is_local) { + if (is_object_local(client_conn->manager_state, object_id)) { reply.has_object = 1; send_client_reply(client_conn, &reply); return; @@ -676,23 +847,24 @@ void process_fetch_request(client_connection *client_conn, void process_fetch_requests(client_connection *client_conn, int num_object_ids, - object_id object_ids[]) { + object_request object_requests[]) { for (int i = 0; i < num_object_ids; ++i) { ++client_conn->num_return_objects; - process_fetch_request(client_conn, object_ids[i]); + process_fetch_request(client_conn, object_requests[i].object_id); } } void return_from_wait(client_connection *client_conn) { CHECK(client_conn->is_wait); + /* TODO: check for wait1. */ client_conn->wait_reply->num_objects_returned = client_conn->wait_reply->num_object_ids - client_conn->num_return_objects; CHECK(plasma_send_reply(client_conn->fd, client_conn->wait_reply) >= 0); plasma_free_reply(client_conn->wait_reply); /* Clean the remaining object connections. */ - client_object_connection *object_conn, *tmp; - HASH_ITER(active_hh, client_conn->active_objects, object_conn, tmp) { - remove_object_connection(client_conn, object_conn); + client_object_request *object_req, *tmp; + HASH_ITER(active_hh, client_conn->active_objects, object_req, tmp) { + remove_object_request(client_conn, object_req); } } @@ -705,36 +877,425 @@ int wait_timeout_handler(event_loop *loop, timer_id id, void *context) { void process_wait_request(client_connection *client_conn, int num_object_ids, - object_id object_ids[], + object_request object_requests[], uint64_t timeout, - int num_returns) { + int num_ready_objects) { plasma_manager_state *manager_state = client_conn->manager_state; - client_conn->num_return_objects = num_returns; + client_conn->num_return_objects = num_ready_objects; client_conn->is_wait = true; + client_conn->wait1 = false; /* old wait */ + client_conn->fetch1 = false; client_conn->timer_id = event_loop_add_timer( manager_state->loop, timeout, wait_timeout_handler, client_conn); - client_conn->wait_reply = plasma_alloc_reply(num_returns); + client_conn->wait_reply = plasma_alloc_reply(num_ready_objects); for (int i = 0; i < num_object_ids; ++i) { available_object *entry; - HASH_FIND(hh, manager_state->local_available_objects, &object_ids[i], - sizeof(object_id), entry); + HASH_FIND(hh, manager_state->local_available_objects, + &(object_requests[i].object_id), + sizeof(object_requests[i].object_id), entry); if (entry) { /* If an object id occurs twice in object_ids, this will count them twice. * This might not be desirable behavior. */ client_conn->num_return_objects -= 1; - client_conn->wait_reply->object_ids[client_conn->num_return_objects] = - entry->object_id; + client_conn->wait_reply->object_requests[client_conn->num_return_objects] + .object_id = entry->object_id; if (client_conn->num_return_objects == 0) { event_loop_remove_timer(manager_state->loop, client_conn->timer_id); return_from_wait(client_conn); return; } } else { - add_object_connection(client_conn, object_ids[i]); + add_object_request(client_conn, object_requests[i].object_id); } } } +/** === START - ALTERNATE PLASMA CLIENT API === */ + +void return_from_wait1(client_connection *client_conn) { + CHECK(client_conn->is_wait); + CHECK(client_conn->wait1); + + CHECK(plasma_send_reply(client_conn->fd, client_conn->wait_reply) >= 0); + free(client_conn->wait_reply); + + /* Clean the remaining object connections. TODO(istoica): Check with Philipp. + */ + client_object_request *object_req, *tmp; + HASH_ITER(active_hh, client_conn->active_objects, object_req, tmp) { + remove_object_request(client_conn, object_req); + } +} + +int wait_timeout_handler1(event_loop *loop, timer_id id, void *context) { + client_connection *client_conn = context; + CHECK(client_conn->timer_id == id); + return_from_wait1(client_conn); + return EVENT_LOOP_TIMER_DONE; +} + +void process_wait_request1(client_connection *client_conn, + int num_object_requests, + object_request object_requests[], + uint64_t timeout, + int num_ready_objects) { + CHECK(client_conn != NULL); + + plasma_manager_state *manager_state = client_conn->manager_state; + client_conn->num_return_objects = num_ready_objects; + + /* We can only run a command at a time on any given client connection + * (client_conn) so set up is_wait so callback() can check whether we are + * still in wait(). */ + client_conn->is_wait = true; + client_conn->wait1 = true; /* new wait request */ + client_conn->fetch1 = false; + + client_conn->wait_reply = plasma_alloc_reply(num_object_requests); + object_requests_copy(num_object_requests, + client_conn->wait_reply->object_requests, + object_requests); + object_requests_set_status_all(num_object_requests, + client_conn->wait_reply->object_requests, + PLASMA_OBJECT_NONEXISTENT); + /* We will just return back the same object_requests list after setting the + * status of the requests. */ + client_conn->wait_reply->num_object_ids = num_object_requests; + + /* Add timer callback. If timeout expires, it invokes wait_timeout_handler(). + * If we get num_ready_objects before timeout expires, we remove the timer. */ + client_conn->timer_id = event_loop_add_timer( + manager_state->loop, timeout, wait_timeout_handler1, client_conn); + + /* Now check whether objects are in the Local Object store, and if not, check + * whether they are remote. */ + for (int i = 0; i < num_object_requests; ++i) { + if (is_object_local(manager_state, object_requests[i].object_id)) { + /* If an object ID occurs twice in object_requests, this will count them + * twice. This might not be desirable behavior. */ + client_conn->num_return_objects -= 1; + client_conn->wait_reply->object_requests[i].status = PLASMA_OBJECT_LOCAL; + if (client_conn->num_return_objects == 0) { + /* We got num_return_objects in the local Object Store, so return. */ + event_loop_remove_timer(manager_state->loop, client_conn->timer_id); + return_from_wait1(client_conn); + return; + } + } else { + object_request *object_request = + &client_conn->wait_reply->object_requests[i]; + + if (object_request->status == PLASMA_OBJECT_NONEXISTENT) { + if (get_object_request(client_conn, object_request->object_id)) { + /* This object is in transfer, which means that it is stored on a + * remote node. */ + client_conn->wait_reply->object_requests[i].status = + PLASMA_OBJECT_REMOTE; + if (client_conn->wait_reply->object_requests[i].type == + PLASMA_QUERY_ANYWHERE) { + client_conn->num_return_objects -= 1; + if (client_conn->num_return_objects == 0) { + /* We got num_return_objects in the local Object Store, so return. + */ + event_loop_remove_timer(manager_state->loop, + client_conn->timer_id); + return_from_wait1(client_conn); + return; + } + } + } + /* Subscribe to hear when object becomes available. */ + retry_info retry_subscribe = { + .num_retries = 0, .timeout = 0, .fail_callback = NULL, + }; + /* TODO(istoica): We should really cache the results here. */ + object_table_subscribe( + g_manager_state->db, + client_conn->wait_reply->object_requests[i].object_id, + wait_object_available_callback, (void *) client_conn, + &retry_subscribe, NULL, NULL); + /* TODO(istoica): Since the existing subscribe doesn't return when the + * object already exists in the Object Table, do a lookup as well. */ + retry_info retry_lookup = { + .num_retries = NUM_RETRIES, + .timeout = MANAGER_TIMEOUT, + .fail_callback = NULL, + }; + + object_table_lookup( + client_conn->manager_state->db, + client_conn->wait_reply->object_requests[i].object_id, + &retry_lookup, wait_object_lookup_callback, client_conn); + } + } + } +} + +void wait_object_lookup_callback(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { + if (manager_count > 0) { + wait_object_available_callback(object_id, context); + } +} + +void wait_object_available_callback(object_id object_id, void *user_context) { + client_connection *client_conn = (client_connection *) user_context; + CHECK(client_conn != NULL); + plasma_manager_state *manager_state = client_conn->manager_state; + CHECK(manager_state); + + if ((!client_conn->is_wait) || (!client_conn->wait1)) { + return; + } + + plasma_reply *wait_reply = client_conn->wait_reply; + object_request *object_request; + object_request = object_requests_get_object( + object_id, wait_reply->num_object_ids, wait_reply->object_requests); + if (object_request == NULL) { + /* Maybe this is from a previous wait call, so ignore it. */ + return; + } + + /* Check first whether object is avilable in the local Plasma Store. */ + if (is_object_local(manager_state, object_id)) { + client_conn->num_return_objects -= 1; + object_request->status = PLASMA_OBJECT_LOCAL; + } else { + object_request->status = PLASMA_OBJECT_REMOTE; + if (object_request->type == PLASMA_QUERY_ANYWHERE) { + client_conn->num_return_objects -= 1; + } + } + + if (client_conn->num_return_objects == 0) { + /* We got num_return_objects in the local Object Store, so return. */ + event_loop_remove_timer(manager_state->loop, client_conn->timer_id); + return_from_wait1(client_conn); + } +} + +void wait_process_object_available_local(client_connection *client_conn, + object_id object_id) { + CHECK(client_conn != NULL); + if (!client_conn->is_wait) { + return; + } + + plasma_reply *wait_reply = client_conn->wait_reply; + object_request *object_request; + object_request = object_requests_get_object( + object_id, wait_reply->num_object_ids, wait_reply->object_requests); + if (object_request) { + client_conn->num_return_objects -= 1; + object_request->status = PLASMA_OBJECT_LOCAL; + } +} + +/** + * Handler handling the timeout experiation of a transfer request. + * + * @param loop Event loop. + * @param timer_id ID of the timer which has expired. + * @param contect Client connection. + * @return Void. + */ +int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { + CHECK(loop); + CHECK(context); + + client_object_request *object_req = context; + client_connection *client_conn = object_req->client_conn; + + LOG_DEBUG("Timer went off, %d tries left", object_req->num_retries); + + if (object_req->num_retries > 0) { + request_transfer_from(client_conn, object_req->object_id); + object_req->num_retries--; + return MANAGER_TIMEOUT; + } + plasma_reply reply = plasma_make_reply(object_req->object_id); + reply.object_status = PLASMA_OBJECT_NONEXISTENT; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); + + remove_object_request(client_conn, object_req); + return EVENT_LOOP_TIMER_DONE; +} + +/** + * Request the transfer from a remote node. + * + * @param object_id ID of the object to transfer. + * @param manager_cont Number of remote nodes object_id is stored at. + * @param manager_vector Array containing the Plasma Managers running at the + * nodes where object_id is stored. + * @param context Client connection. + * @return Void. + */ +void request_fetch_initiate(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { + client_connection *client_conn = (client_connection *) context; + int status = request_fetch_or_status(object_id, manager_count, manager_vector, + context, true); + plasma_reply reply = plasma_make_reply(object_id); + reply.object_status = status; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); +} + +/** + * Check whether a non-local object is stored on any remot enote or not. + * + * @param object_id ID of the object whose status we require. + * @param manager_cont Number of remote nodes object_id is stored at. If + * manager_count > 0, then object_id exists on a remote node an its + * status is PLASMA_OBJECT_REMOTE. Otherwise, if manager_count == 0, the + * object doesn't exist in the system and its status is + * PLASMA_OBJECT_NONEXISTENT. + * @param manager_vector Array containing the Plasma Managers running at the + * nodes where object_id is stored. Not used; it will be eventually + * deallocated. + * @param context Client connection. + * @return Void. + */ +void request_status_done(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { + client_connection *client_conn = (client_connection *) context; + int status = request_fetch_or_status(object_id, manager_count, manager_vector, + context, false); + plasma_reply reply = plasma_make_reply(object_id); + reply.object_status = status; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); +} + +int request_fetch_or_status(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context, + bool fetch) { + client_connection *client_conn = (client_connection *) context; + client_object_request *object_req = + get_object_request(client_conn, object_id); + + /* Return success immediately if we already have this object. */ + if (is_object_local(client_conn->manager_state, object_id)) { + return PLASMA_OBJECT_LOCAL; + } + + /* Check wether there's already an outstanding fetch for this object for this + * client, and if yes let the outstanding request finish the work. Note that + * we have already checked for this in process_fetch_or_status_request(), but + * we need to check again here as the object could hav been evicted since + * then. */ + if (object_req) { + return PLASMA_OBJECT_IN_TRANSFER; + } + + /* If the object isn't on any managers, report a failure to the client. */ + LOG_DEBUG("Object is on %d managers", manager_count); + if (manager_count == 0) { + free(manager_vector); + if (object_req) { + remove_object_request(client_conn, object_req); + } + return PLASMA_OBJECT_NONEXISTENT; + } + + if (fetch) { + /* Register the new outstanding fetch with the current client connection. */ + object_req = add_object_request(client_conn, object_id); + CHECKM(object_req != NULL, "Unable to allocate memory for object context."); + + /* Pick a different manager to request a transfer from on every attempt. */ + object_req->manager_count = manager_count; + object_req->manager_vector = malloc(manager_count * sizeof(char *)); + memset(object_req->manager_vector, 0, manager_count * sizeof(char *)); + for (int i = 0; i < manager_count; ++i) { + int len = strlen(manager_vector[i]); + object_req->manager_vector[i] = malloc(len + 1); + strncpy(object_req->manager_vector[i], manager_vector[i], len); + object_req->manager_vector[i][len] = '\0'; + } + free(manager_vector); + /* Wait for the object data for the default number of retries, which timeout + * after a default interval. */ + object_req->num_retries = NUM_RETRIES; + object_req->object_id = object_id; + object_req->timer = + event_loop_add_timer(client_conn->manager_state->loop, MANAGER_TIMEOUT, + fetch_timeout_handler, object_req); + request_transfer_from(client_conn, object_id); + /* Let scheduling the fetch request proceded and return. */ + }; + + /* Since object is not stored at the local locally, manager_count > 0 means + * that the object is stored at another remote object. Otherwise, if + * manager_count == 0, the object is not stored anywhere. */ + return (manager_count > 0 ? PLASMA_OBJECT_REMOTE : PLASMA_OBJECT_NONEXISTENT); +} + +void object_table_lookup_fail_callback(object_id object_id, + void *user_context, + void *user_data) { + /* Fail for now. Later, we may want to send a PLASMA_OBJECT_NONEXISTENT to the + * client. */ + CHECK(0); +} + +void process_fetch_or_status_request(client_connection *client_conn, + object_id object_id, + bool fetch) { + client_conn->is_wait = false; + client_conn->fetch1 = true; + client_conn->wait_reply = NULL; + + /* Return success immediately if we already have this object. */ + if (is_object_local(client_conn->manager_state, object_id)) { + plasma_reply reply = plasma_make_reply(object_id); + reply.object_status = PLASMA_OBJECT_LOCAL; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); + return; + } + + /* Check whether a transfer request for this object is already pending. */ + if (get_object_request(client_conn, object_id)) { + plasma_reply reply = plasma_make_reply(object_id); + reply.object_status = PLASMA_OBJECT_IN_TRANSFER; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); + return; + } + + if (client_conn->manager_state->db == NULL) { + plasma_reply reply = plasma_make_reply(object_id); + reply.object_status = PLASMA_OBJECT_NONEXISTENT; + CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); + return; + } + + /* The object is not local, so check whether it is stored remotely. */ + retry_info retry = { + .num_retries = NUM_RETRIES, + .timeout = MANAGER_TIMEOUT, + .fail_callback = object_table_lookup_fail_callback, + }; + + if (fetch) { + /* Request a transfer from a plasma manager that has this object, if any. */ + object_table_lookup(client_conn->manager_state->db, object_id, &retry, + request_fetch_initiate, client_conn); + } else { + object_table_lookup(client_conn->manager_state->db, object_id, &retry, + request_status_done, client_conn); + } +} + +/* === END - ALTERNATE PLASMA CLIENT API === */ + void process_object_notification(event_loop *loop, int client_sock, void *context, @@ -760,31 +1321,42 @@ void process_object_notification(event_loop *loop, entry); /* Notify any clients who were waiting on a fetch to this object and tick * off objects we are waiting for. */ - client_object_connection *object_conn, *next; + client_object_request *object_req, *next; client_connection *client_conn; - HASH_FIND(fetch_hh, state->fetch_connections, &obj_id, sizeof(object_id), - object_conn); + HASH_FIND(fetch_hh, state->fetch_requests, &obj_id, sizeof(object_id), + object_req); plasma_reply reply = plasma_make_reply(obj_id); reply.has_object = 1; - while (object_conn) { - next = object_conn->next; - client_conn = object_conn->client_conn; + while (object_req) { + next = object_req->next; + client_conn = object_req->client_conn; if (!client_conn->is_wait) { - event_loop_remove_timer(state->loop, object_conn->timer); - send_client_reply(client_conn, &reply); + event_loop_remove_timer(state->loop, object_req->timer); + if (!client_conn->fetch1) { + send_client_reply(client_conn, &reply); + } } else { - client_conn->num_return_objects -= 1; - client_conn->wait_reply->object_ids[client_conn->num_return_objects] = - obj_id; + if (client_conn->wait1) { + wait_process_object_available_local(client_conn, obj_id); + } else { + client_conn->num_return_objects -= 1; + client_conn->wait_reply + ->object_requests[client_conn->num_return_objects] + .object_id = obj_id; + } if (client_conn->num_return_objects == 0) { event_loop_remove_timer(loop, client_conn->timer_id); - return_from_wait(client_conn); - object_conn = next; + if (client_conn->wait1) { + return_from_wait1(client_conn); + } else { + return_from_wait(client_conn); + } + object_req = next; continue; } } - remove_object_connection(client_conn, object_conn); - object_conn = next; + remove_object_request(client_conn, object_req); + object_req = next; } } @@ -800,24 +1372,45 @@ void process_message(event_loop *loop, switch (type) { case PLASMA_TRANSFER: - process_transfer_request(loop, req->object_ids[0], req->addr, req->port, - conn); + DCHECK(req->num_object_ids == 1); + process_transfer_request(loop, req->object_requests[0].object_id, req->addr, + req->port, conn); break; case PLASMA_DATA: LOG_DEBUG("Starting to stream data"); - process_data_request(loop, client_sock, req->object_ids[0], req->data_size, - req->metadata_size, conn); + DCHECK(req->num_object_ids == 1); + process_data_request(loop, client_sock, req->object_requests[0].object_id, + req->data_size, req->metadata_size, conn); break; case PLASMA_FETCH: LOG_DEBUG("Processing fetch"); - process_fetch_requests(conn, req->num_object_ids, req->object_ids); + process_fetch_requests(conn, req->num_object_ids, req->object_requests); + break; + case PLASMA_FETCH_REMOTE: + LOG_DEBUG("Processing fetch remote"); + DCHECK(req->num_object_ids == 1); + process_fetch_or_status_request(conn, req->object_requests[0].object_id, + true); break; case PLASMA_WAIT: LOG_DEBUG("Processing wait"); - process_wait_request(conn, req->num_object_ids, req->object_ids, - req->timeout, req->num_returns); + process_wait_request(conn, req->num_object_ids, req->object_requests, + req->timeout, req->num_ready_objects); + break; + case PLASMA_WAIT1: + LOG_DEBUG("Processing wait1"); + process_wait_request1(conn, req->num_object_ids, req->object_requests, + req->timeout, req->num_ready_objects); + break; + case PLASMA_STATUS: + LOG_DEBUG("Processing status"); + DCHECK(req->num_object_ids == 1); + process_fetch_or_status_request(conn, req->object_requests[0].object_id, + false); break; case PLASMA_SEAL: { + LOG_DEBUG("Publishing to object table from DB client %d.", + get_client_id(conn->manager_state->db)); /* TODO(swang): Log the error if we fail to add the object, and possibly * retry later? */ retry_info retry = { @@ -825,8 +1418,10 @@ void process_message(event_loop *loop, .timeout = MANAGER_TIMEOUT, .fail_callback = NULL, }; - object_table_add(conn->manager_state->db, req->object_ids[0], &retry, NULL, - NULL); + if (conn->manager_state->db) { + object_table_add(conn->manager_state->db, + req->object_requests[0].object_id, &retry, NULL, NULL); + } } break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index 4d314f88e..d6ac7b716 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -22,7 +22,7 @@ typedef struct plasma_manager_state plasma_manager_state; typedef struct client_connection client_connection; -typedef struct client_object_connection client_object_connection; +typedef struct client_object_request client_object_request; /** * Initializes the plasma manager state. This connects the manager to the local @@ -34,8 +34,8 @@ typedef struct client_object_connection client_object_connection; * @param store_socket_name The socket name used to connect to the local store. * @param manager_addr Our IP address. * @param manager_port The IP port that we listen on. - * @param db_addr The IP address of the database to connect to. If this is - * NULL, then the manager will be initialized without a database + * @param db_addr The IP address of the database to connect to. If this is NULL, + * then the manager will be initialized without a database * connection. * @param db_port The IP port of the database to connect to. * @return A pointer to the initialized plasma manager state. @@ -59,8 +59,7 @@ void destroy_plasma_manager_state(plasma_manager_state *state); * * @param loop This is the event loop of the plasma manager. * @param object_id The object_id of the object we will be sending. - * @param addr The IP address of the plasma manager we are sending the object - * to. + * @param addr The IP address of the plasma manager to send the object to. * @param port The port of the plasma manager we are sending the object to. * @param conn The client_connection to the other plasma manager. * @return Void. @@ -138,12 +137,12 @@ void process_fetch_request(client_connection *client_conn, object_id object_id); * @param client_conn The connection context for the client that made the * request. * @param num_object_ids The number of object IDs requested. - * @param object_ids[] The vector of object IDs requested. + * @param object_requests[] The object requests fetch is called on. * @return Void. */ void process_fetch_requests(client_connection *client_conn, int num_object_ids, - object_id object_ids[]); + object_request object_requests[]); /** * Process a wait request from a client. @@ -151,7 +150,7 @@ void process_fetch_requests(client_connection *client_conn, * @param client_conn The connection context for the client that made the * request. * @param num_object_ids Number of object IDs wait is called on. - * @param object_ids Object IDs wait is called on. + * @param object_requests The object requests wait is called on. * @param timeout Wait will time out and return after this number of * milliseconds. * @param num_returns Number of object IDs wait will return if it doesn't time @@ -160,7 +159,7 @@ void process_fetch_requests(client_connection *client_conn, */ void process_wait_request(client_connection *client_conn, int num_object_ids, - object_id object_ids[], + object_request object_requests[], uint64_t timeout, int num_returns); @@ -188,7 +187,7 @@ void process_object_notification(event_loop *loop, * @param loop This is the event loop of the plasma manager. * @param data_sock This is the socket the other plasma manager is listening on. * @param context The client_connection to the other plasma manager, contains a - * queue of objects that will be sent. + * queue of objects that will be sent. * @return Void. */ void send_queued_request(event_loop *loop, @@ -243,8 +242,8 @@ struct plasma_request_buffer { * create. * @return A pointer to the newly created object context. */ -client_object_connection *add_object_connection(client_connection *client_conn, - object_id object_id); +client_object_request *add_object_request(client_connection *client_conn, + object_id object_id); /** * Given an object ID and the managers it can be found on, start requesting a @@ -272,8 +271,8 @@ void request_transfer(object_id object_id, * @param object_id The object ID whose context we want to delete. * @return Void. */ -void remove_object_connection(client_connection *client_conn, - client_object_connection *object_conn); +void remove_object_request(client_connection *client_conn, + client_object_request *object_req); /** * Get a connection to the remote manager at the specified address. Creates a @@ -327,4 +326,46 @@ event_loop *get_event_loop(plasma_manager_state *state); */ int get_client_sock(client_connection *conn); +/** + * Process a wait request from a client. + * + * @param client_conn The connection context for the client that made the + * request. + * @param num_object_requests Number of object requests wait is called on. + * @param object_requests The array of bject requests wait is called on. + * @param timeout Wait will time out and return after this number of + * milliseconds. + * @param num_returns Number of object requests that will be satsified before + * wait will retunr, unless it timeouts. + * @return Void. + */ +void process_wait_request1(client_connection *client_conn, + int num_object_requests, + object_request object_requests[], + uint64_t timeout, + int num_ready_objects); + +/** + * Callback to be invoked when object_id entry is changed in the + * Object Table. We assume that the change means the object is available. + * + * @param object_id ID of the object becoming available locally or remotely. + * @param user_context This is the client connection on which the wait has been + * called. + * @return Void. + */ +void wait_object_available_callback(object_id object_id, void *user_context); + +/** + * Object is available (sealed) in the local Object Store. This is part of + * executing wait operation. + * + * @param client_conn The client conection. + * @param user_context This is the client connection on which the wait has been + * called. + * @return Void. + */ +void wait_process_object_available_local(client_connection *client_conn, + object_id object_id); + #endif /* PLASMA_MANAGER_H */ diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 88615b3fe..a19c124e7 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -32,6 +32,7 @@ #include "fling.h" #include "malloc.h" #include "plasma_store.h" +#include "plasma.h" void *dlmalloc(size_t); void dlfree(void *); @@ -131,7 +132,7 @@ void add_client_to_object_clients(object_table_entry *entry, } /* Create a new object buffer in the hash table. */ -void create_object(client *client_context, +bool create_object(client *client_context, object_id obj_id, int64_t data_size, int64_t metadata_size, @@ -142,7 +143,11 @@ void create_object(client *client_context, /* TODO(swang): Return these error to the client instead of exiting. */ HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id, sizeof(obj_id), entry); - CHECKM(entry == NULL, "Cannot create object twice."); + if (entry != NULL) { + /* There is already an object with the same ID in the Plasma Store, so + * ignore this requst. */ + return false; + } /* Tell the eviction policy how much space we need to create this object. */ int64_t num_objects_to_evict; object_id *objects_to_evict; @@ -167,7 +172,7 @@ void create_object(client *client_context, entry->fd = fd; entry->map_size = map_size; entry->offset = offset; - entry->state = OPEN; + entry->state = PLASMA_CREATED; utarray_new(entry->clients, &client_icd); HASH_ADD(handle, plasma_state->plasma_store_info->objects, object_id, sizeof(object_id), entry); @@ -184,6 +189,7 @@ void create_object(client *client_context, obj_id); /* Record that this client is using this object. */ add_client_to_object_clients(entry, client_context); + return true; } /* Get an object from the hash table. */ @@ -195,7 +201,7 @@ int get_object(client *client_context, object_table_entry *entry; HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, sizeof(object_id), entry); - if (entry && entry->state == SEALED) { + if (entry && entry->state == PLASMA_SEALED) { result->handle.store_fd = entry->fd; result->handle.mmap_size = entry->map_size; result->data_offset = entry->offset; @@ -224,6 +230,30 @@ int get_object(client *client_context, return OBJECT_NOT_FOUND; } +/* Get an object from the local Plasma Store if exists. */ +int get_object_local(client *client_context, + int conn, + object_id object_id, + plasma_object *result) { + plasma_store_state *plasma_state = client_context->plasma_state; + object_table_entry *entry; + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, + sizeof(object_id), entry); + if (entry && entry->state == PLASMA_SEALED) { + result->handle.store_fd = entry->fd; + result->handle.mmap_size = entry->map_size; + result->data_offset = entry->offset; + result->metadata_offset = entry->offset + entry->info.data_size; + result->data_size = entry->info.data_size; + result->metadata_size = entry->info.metadata_size; + /* If necessary, record that this client is using this object. In the case + * where entry == NULL, this will be called from seal_object. */ + add_client_to_object_clients(entry, client_context); + return OBJECT_FOUND; + } + return OBJECT_NOT_FOUND; +} + int remove_client_from_object_clients(object_table_entry *entry, client *client_info) { /* Find the location of the client in the array. */ @@ -269,7 +299,8 @@ int contains_object(client *client_context, object_id object_id) { object_table_entry *entry; HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, sizeof(object_id), entry); - return entry && (entry->state == SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND; + return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND + : OBJECT_NOT_FOUND; } /* Seal an object that has been created in the hash table. */ @@ -280,9 +311,9 @@ void seal_object(client *client_context, object_id object_id) { HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, sizeof(object_id), entry); CHECK(entry != NULL); - CHECK(entry->state == OPEN); + CHECK(entry->state == PLASMA_CREATED); /* Set the state of object to SEALED. */ - entry->state = SEALED; + entry->state = PLASMA_SEALED; /* Inform all subscribers that a new object has been sealed. */ notification_queue *queue, *temp_queue; HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) { @@ -329,7 +360,7 @@ void delete_object(plasma_store_state *plasma_state, object_id object_id) { * error. Maybe we should also support deleting objects that have been created * but not sealed. */ CHECKM(entry != NULL, "To delete an object it must be in the object table."); - CHECKM(entry->state == SEALED, + CHECKM(entry->state == PLASMA_SEALED, "To delete an object it must have been sealed."); CHECKM(utarray_len(entry->clients) == 0, "To delete an object, there must be no clients currently using it."); @@ -429,29 +460,54 @@ void process_message(event_loop *loop, /* Process the different types of requests. */ switch (type) { case PLASMA_CREATE: - create_object(client_context, req->object_ids[0], req->data_size, - req->metadata_size, &reply.object); + DCHECK(req->num_object_ids == 1); + if (create_object(client_context, req->object_requests[0].object_id, + req->data_size, req->metadata_size, &reply.object)) { + reply.error_code = PLASMA_REPLY_OK; + } else { + reply.error_code = PLASMA_OBJECT_ALREADY_EXISTS; + } CHECK(plasma_send_reply(client_sock, &reply) >= 0); CHECK(send_fd(client_sock, reply.object.handle.store_fd) >= 0); break; case PLASMA_GET: - if (get_object(client_context, client_sock, req->object_ids[0], + DCHECK(req->num_object_ids == 1); + if (get_object(client_context, client_sock, + req->object_requests[0].object_id, &reply.object) == OBJECT_FOUND) { CHECK(plasma_send_reply(client_sock, &reply) >= 0); CHECK(send_fd(client_sock, reply.object.handle.store_fd) >= 0); } break; + case PLASMA_GET_LOCAL: + DCHECK(req->num_object_ids == 1); + if (get_object_local(client_context, client_sock, + req->object_requests[0].object_id, + &reply.object) == OBJECT_FOUND) { + reply.has_object = true; + CHECK(plasma_send_reply(client_sock, &reply) >= 0); + CHECK(send_fd(client_sock, reply.object.handle.store_fd) >= 0); + } else { + reply.has_object = false; + CHECK(plasma_send_reply(client_sock, &reply) >= 0); + CHECK(send_fd(client_sock, reply.object.handle.store_fd) >= 0); + } + break; case PLASMA_RELEASE: - release_object(client_context, req->object_ids[0]); + DCHECK(req->num_object_ids == 1); + release_object(client_context, req->object_requests[0].object_id); break; case PLASMA_CONTAINS: - if (contains_object(client_context, req->object_ids[0]) == OBJECT_FOUND) { + DCHECK(req->num_object_ids == 1); + if (contains_object(client_context, req->object_requests[0].object_id) == + OBJECT_FOUND) { reply.has_object = 1; } CHECK(plasma_send_reply(client_sock, &reply) >= 0); break; case PLASMA_SEAL: - seal_object(client_context, req->object_ids[0]); + DCHECK(req->num_object_ids == 1); + seal_object(client_context, req->object_requests[0].object_id); break; case PLASMA_DELETE: /* TODO(rkn): In the future, we can use this method to give hints to the diff --git a/src/plasma/plasma_store.h b/src/plasma/plasma_store.h index 235c6c171..9642cccfe 100644 --- a/src/plasma/plasma_store.h +++ b/src/plasma/plasma_store.h @@ -15,9 +15,9 @@ typedef struct plasma_store_state plasma_store_state; * @param object_id Object ID of the object to be created. * @param data_size Size in bytes of the object to be created. * @param metadata_size Size in bytes of the object metadata. - * @return Void. + * @return False if the object already exists, otherwise true. */ -void create_object(client *client_context, +bool create_object(client *client_context, object_id object_id, int64_t data_size, int64_t metadata_size, @@ -41,6 +41,24 @@ int get_object(client *client_context, object_id object_id, plasma_object *result); +/** + * Get an object from the local Plasma Store. This function is not blocking. + * + * Once a client gets an object it must release it when it is done with it. + * This function is indepontent. If a client calls repeatedly get_object_local() + * on the same object_id, the client needs to call release_object() only once. + * + * @param client_context The context of the client making this request. + * @param conn The client connection that requests the object. + * @param object_id Object ID of the object to be gotten. + * @return Return OBJECT_FOUND if object was found, and OBJECT_NOT_FOUND + * otherwise. + */ +int get_object_local(client *client_context, + int conn, + object_id object_id, + plasma_object *result); + /** * Record the fact that a particular client is no longer using an object. * diff --git a/src/plasma/test/client_tests.c b/src/plasma/test/client_tests.c new file mode 100644 index 000000000..febed9432 --- /dev/null +++ b/src/plasma/test/client_tests.c @@ -0,0 +1,379 @@ +#include "greatest.h" + +#include +#include +#include + +#include "plasma.h" +#include "plasma_client.h" + +SUITE(plasma_client_tests); + +TEST plasma_status_tests(void) { + plasma_connection *plasma_conn1 = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + plasma_connection *plasma_conn2 = plasma_connect( + "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid1 = globally_unique_id(); + + /* Test for object non-existence. */ + int status = plasma_status(plasma_conn1, oid1); + ASSERT(status == PLASMA_OBJECT_NONEXISTENT); + + /* Test for the object being in local Plasma store. */ + /* First create object. */ + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); + plasma_seal(plasma_conn1, oid1); + /* Sleep to avoid race condition of Plasma Manager waiting for notification. + */ + sleep(1); + status = plasma_status(plasma_conn1, oid1); + ASSERT(status == PLASMA_OBJECT_LOCAL); + + /* Test for object being remote. */ + status = plasma_status(plasma_conn2, oid1); + ASSERT(status == PLASMA_OBJECT_REMOTE); + + plasma_disconnect(plasma_conn1); + plasma_disconnect(plasma_conn2); + + PASS(); +} + +TEST plasma_fetch_remote_tests(void) { + plasma_connection *plasma_conn1 = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + plasma_connection *plasma_conn2 = plasma_connect( + "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid1 = globally_unique_id(); + + /* Test for object non-existence. */ + int status; + + /* No object in the system */ + status = plasma_fetch_remote(plasma_conn1, oid1); + ASSERT(status == PLASMA_OBJECT_NONEXISTENT); + + /* Test for the object being in local Plasma store. */ + /* First create object. */ + int64_t data_size = 100; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); + plasma_seal(plasma_conn1, oid1); + + /* Object with ID oid1 has been just inserted. On the next fetch we might + * either find the object or not, depending on whether the Plasma Manager has + * received the notification from the Plasma Store or not. */ + status = plasma_fetch_remote(plasma_conn1, oid1); + ASSERT((status == PLASMA_OBJECT_LOCAL) || + (status == PLASMA_OBJECT_NONEXISTENT)); + + /* Sleep to make sure Plasma Manager got the notification. */ + sleep(1); + status = plasma_fetch_remote(plasma_conn1, oid1); + ASSERT(status == PLASMA_OBJECT_LOCAL); + + /* Test for object being remote. */ + status = plasma_fetch_remote(plasma_conn2, oid1); + ASSERT(status == PLASMA_OBJECT_REMOTE); + + /* Sleep to make sure the object has been fetched and it is now stored in the + * local Plasma Store. */ + sleep(1); + status = plasma_fetch_remote(plasma_conn2, oid1); + ASSERT(status == PLASMA_OBJECT_LOCAL); + + sleep(1); + plasma_disconnect(plasma_conn1); + plasma_disconnect(plasma_conn2); + + PASS(); +} + +void init_data_123(uint8_t *data, uint64_t size, uint8_t base) { + for (int i = 0; i < size; i++) { + data[i] = base + i; + } +} + +bool is_equal_data_123(uint8_t *data1, uint8_t *data2, uint64_t size) { + for (int i = 0; i < size; i++) { + if (data1[i] != data2[i]) { + return false; + }; + } + return true; +} + +TEST plasma_get_local_tests(void) { + plasma_connection *plasma_conn = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid = globally_unique_id(); + object_buffer obj_buffer; + + /* Test for object non-existence. */ + int status = plasma_get_local(plasma_conn, oid, &obj_buffer); + ASSERT(status == false); + + /* Test for the object being in local Plasma store. */ + /* First create object. */ + int64_t data_size = 4; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn, oid, data_size, metadata, metadata_size, &data); + init_data_123(data, data_size, 0); + plasma_seal(plasma_conn, oid); + + sleep(1); + status = plasma_get_local(plasma_conn, oid, &obj_buffer); + ASSERT(status == true); + ASSERT(is_equal_data_123(data, obj_buffer.data, data_size) == true); + + sleep(1); + plasma_disconnect(plasma_conn); + + PASS(); +} + +TEST plasma_wait_for_objects_tests(void) { + plasma_connection *plasma_conn1 = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + plasma_connection *plasma_conn2 = plasma_connect( + "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid1 = globally_unique_id(); + object_id oid2 = globally_unique_id(); +#define NUM_OBJ_REQUEST 2 +#define WAIT_TIMEOUT_MS 1000 + object_request obj_requests[NUM_OBJ_REQUEST]; + + obj_requests[0].object_id = oid1; + obj_requests[0].type = PLASMA_QUERY_ANYWHERE; + obj_requests[1].object_id = oid2; + obj_requests[1].type = PLASMA_QUERY_ANYWHERE; + + struct timeval start, end; + gettimeofday(&start, NULL); + int n = plasma_wait_for_objects(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + ASSERT(n == 0); + gettimeofday(&end, NULL); + float diff_ms = (end.tv_sec - start.tv_sec); + diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; + /* Reduce threshold by 10% to make sure we pass consistently. */ + ASSERT(diff_ms > WAIT_TIMEOUT_MS * 0.9); + + /* Create and insert an object in plasma_conn1. */ + int64_t data_size = 4; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); + plasma_seal(plasma_conn1, oid1); + + sleep(1); + n = plasma_wait_for_objects(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + ASSERT(n == 1); + + /* Create and insert an object in plasma_conn1. */ + plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); + plasma_seal(plasma_conn2, oid2); + + n = plasma_wait_for_objects(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + ASSERT(n == 2); + + n = plasma_wait_for_objects(plasma_conn2, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + ASSERT(n == 2); + + obj_requests[0].type = PLASMA_QUERY_LOCAL; + obj_requests[1].type = PLASMA_QUERY_LOCAL; + n = plasma_wait_for_objects(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + ASSERT(n == 1); + + n = plasma_wait_for_objects(plasma_conn2, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + ASSERT(n == 1); + + sleep(1); + plasma_disconnect(plasma_conn1); + plasma_disconnect(plasma_conn2); + + PASS(); +} + +TEST plasma_get_tests(void) { + plasma_connection *plasma_conn1 = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + plasma_connection *plasma_conn2 = plasma_connect( + "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid1 = globally_unique_id(); + object_id oid2 = globally_unique_id(); + object_buffer obj_buffer; + + int64_t data_size = 4; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); + init_data_123(data, data_size, 1); + plasma_seal(plasma_conn1, oid1); + + plasma_client_get(plasma_conn1, oid1, &obj_buffer); + ASSERT(data[0] == obj_buffer.data[0]); + + plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); + init_data_123(data, data_size, 2); + plasma_seal(plasma_conn2, oid2); + + plasma_client_get(plasma_conn1, oid2, &obj_buffer); + ASSERT(data[0] == obj_buffer.data[0]); + + sleep(1); + plasma_disconnect(plasma_conn1); + plasma_disconnect(plasma_conn2); + + PASS(); +} + +TEST plasma_wait_tests(void) { + plasma_connection *plasma_conn1 = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + plasma_connection *plasma_conn2 = plasma_connect( + "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid1 = globally_unique_id(); + object_id oid2 = globally_unique_id(); + object_id obj_ids[NUM_OBJ_REQUEST]; + object_id return_obj_ids[NUM_OBJ_REQUEST]; + + obj_ids[0] = oid1; + obj_ids[1] = oid2; + + struct timeval start, end; + gettimeofday(&start, NULL); + + int n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, + WAIT_TIMEOUT_MS, 1, return_obj_ids); + + ASSERT(n == 0); + gettimeofday(&end, NULL); + float diff_ms = (end.tv_sec - start.tv_sec); + diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; + /* Reduce threshold by 10% to make sure we pass consistently. */ + ASSERT(diff_ms > WAIT_TIMEOUT_MS * 0.9); + + /* Create and insert an object in plasma_conn1. */ + int64_t data_size = 4; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); + plasma_seal(plasma_conn1, oid1); + + n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, + WAIT_TIMEOUT_MS, 1, return_obj_ids); + ASSERT(n == 1); + ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); + + gettimeofday(&start, NULL); + return_obj_ids[0].id[0] = 0; + n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, + WAIT_TIMEOUT_MS, 2, return_obj_ids); + ASSERT(n == 1); + ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); + gettimeofday(&end, NULL); + diff_ms = (end.tv_sec - start.tv_sec); + diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; + ASSERT(diff_ms > WAIT_TIMEOUT_MS * 0.9); + + /* Create and insert an object in plasma_conn1. */ + plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); + plasma_seal(plasma_conn2, oid2); + + return_obj_ids[0].id[0] = 0; + n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, + WAIT_TIMEOUT_MS, 2, return_obj_ids); + ASSERT(n == 2); + ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); + ASSERT(oid2.id[0] == return_obj_ids[1].id[0]); + + return_obj_ids[0].id[0] = return_obj_ids[1].id[0] = 0; + n = plasma_client_wait(plasma_conn2, NUM_OBJ_REQUEST, obj_ids, + WAIT_TIMEOUT_MS, 2, return_obj_ids); + ASSERT(n == 2); + ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); + ASSERT(oid2.id[0] == return_obj_ids[1].id[0]); + + sleep(1); + plasma_disconnect(plasma_conn1); + plasma_disconnect(plasma_conn2); + + PASS(); +} + +TEST plasma_multiget_tests(void) { + plasma_connection *plasma_conn1 = plasma_connect( + "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); + plasma_connection *plasma_conn2 = plasma_connect( + "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); + object_id oid1 = globally_unique_id(); + object_id oid2 = globally_unique_id(); + object_id obj_ids[NUM_OBJ_REQUEST]; + object_buffer obj_buffer[NUM_OBJ_REQUEST]; + int obj1_first = 1, obj2_first = 2; + + obj_ids[0] = oid1; + obj_ids[1] = oid2; + + int64_t data_size = 4; + uint8_t metadata[] = {5}; + int64_t metadata_size = sizeof(metadata); + uint8_t *data; + plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); + init_data_123(data, data_size, obj1_first); + plasma_seal(plasma_conn1, oid1); + + plasma_client_multiget(plasma_conn1, 1, obj_ids, obj_buffer); + ASSERT(data[0] == obj_buffer[0].data[0]); + + plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); + init_data_123(data, data_size, obj2_first); + plasma_seal(plasma_conn2, oid2); + + plasma_client_multiget(plasma_conn1, 2, obj_ids, obj_buffer); + ASSERT(obj1_first == obj_buffer[0].data[0]); + ASSERT(obj2_first == obj_buffer[1].data[0]); + + sleep(1); + plasma_disconnect(plasma_conn1); + plasma_disconnect(plasma_conn2); + + PASS(); +} + +SUITE(plasma_client_tests) { + RUN_TEST(plasma_status_tests); + RUN_TEST(plasma_fetch_remote_tests); + RUN_TEST(plasma_get_local_tests); + RUN_TEST(plasma_wait_for_objects_tests); + RUN_TEST(plasma_get_tests); + RUN_TEST(plasma_wait_tests); + RUN_TEST(plasma_multiget_tests); +} + +GREATEST_MAIN_DEFS(); + +int main(int argc, char **argv) { + GREATEST_MAIN_BEGIN(); + RUN_SUITE(plasma_client_tests); + GREATEST_MAIN_END(); +} diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index 3da35a9c0..8f03a6ba4 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -168,7 +168,7 @@ TEST request_transfer_test(void) { read_message(read_fd, &type, &length, (uint8_t **) &req); ASSERT(type == PLASMA_TRANSFER); ASSERT(req->num_object_ids == 1); - ASSERT(object_ids_equal(oid, req->object_ids[0])); + ASSERT(object_ids_equal(oid, req->object_requests[0].object_id)); /* Clean up. */ utstring_free(addr); free(req); @@ -214,7 +214,7 @@ TEST request_transfer_retry_test(void) { read_message(read_fd, &type, &length, (uint8_t **) &req); ASSERT(type == PLASMA_TRANSFER); ASSERT(req->num_object_ids == 1); - ASSERT(object_ids_equal(oid, req->object_ids[0])); + ASSERT(object_ids_equal(oid, req->object_requests[0].object_id)); /* Clean up. */ utstring_free(addr0); utstring_free(addr1); @@ -254,7 +254,7 @@ TEST request_transfer_timeout_test(void) { int nbytes = recv(manager_fd, (uint8_t *) &reply, sizeof(reply), MSG_WAITALL); ASSERT_EQ(nbytes, sizeof(reply)); ASSERT_EQ(reply.num_object_ids, 1); - ASSERT(object_ids_equal(oid, reply.object_ids[0])); + ASSERT(object_ids_equal(oid, reply.object_requests[0].object_id)); ASSERT_EQ(reply.has_object, 0); /* Clean up. */ utstring_free(addr); diff --git a/src/plasma/test/run_client_tests.sh b/src/plasma/test/run_client_tests.sh new file mode 100755 index 000000000..5fba13540 --- /dev/null +++ b/src/plasma/test/run_client_tests.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +../common/thirdparty/redis/src/redis-server --loglevel warning & +sleep 1 +# flush the redis server +../common/thirdparty/redis/src/redis-cli flushall & +sleep 1 +./build/plasma_store -s /tmp/store1 -m 1000000000 & +./build/plasma_manager -m /tmp/manager1 -s /tmp/store1 -h 127.0.0.1 -p 11111 -r 127.0.0.1:6379 & +./build/plasma_store -s /tmp/store2 -m 1000000000 & +./build/plasma_manager -m /tmp/manager2 -s /tmp/store2 -h 127.0.0.1 -p 22222 -r 127.0.0.1:6379 & +sleep 1 +./build/client_tests +kill %4 +kill %3 +kill %6 +kill %5 +kill %1