diff --git a/.travis.yml b/.travis.yml
index 464724a27..d592044fe 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -43,6 +43,7 @@ matrix:
install:
- make
+ - make test
script:
- source setup-env.sh
diff --git a/Makefile b/Makefile
index c36d99e02..0ad78eb74 100644
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,7 @@ BUILD = build
all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example $(BUILD)/libplasma_client.a
debug: FORCE
-debug: CFLAGS += -DDEBUG=1
+debug: CFLAGS += -DRAY_COMMON_DEBUG=1
debug: all
clean:
@@ -16,7 +16,7 @@ $(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/fling.h src/fling.c s
$(CC) $(CFLAGS) src/plasma_store.c src/fling.c src/malloc.c common/build/libcommon.a -o $(BUILD)/plasma_store
$(BUILD)/plasma_manager: src/plasma_manager.c src/plasma.h src/plasma_client.c src/fling.h src/fling.c common
- $(CC) $(CFLAGS) src/plasma_manager.c src/plasma_client.c src/fling.c common/build/libcommon.a -o $(BUILD)/plasma_manager
+ $(CC) $(CFLAGS) src/plasma_manager.c src/plasma_client.c src/fling.c common/build/libcommon.a common/thirdparty/hiredis/libhiredis.a -o $(BUILD)/plasma_manager
$(BUILD)/plasma_client.so: src/plasma_client.c src/fling.h src/fling.c common
$(CC) $(CFLAGS) src/plasma_client.c src/fling.c common/build/libcommon.a -fPIC -shared -o $(BUILD)/plasma_client.so
@@ -31,4 +31,10 @@ common: FORCE
git submodule update --init --recursive
cd common; make
+# Set the request timeout low for testing purposes.
+test: CFLAGS += -DRAY_TIMEOUT=50
+test: FORCE
+ cd common; make redis
+test: all
+
FORCE:
diff --git a/common b/common
index f4037ad19..da3a3127e 160000
--- a/common
+++ b/common
@@ -1 +1 @@
-Subproject commit f4037ad19f38dc68b186c9338d3f67c9058c556c
+Subproject commit da3a3127e095f679651119f0debfafcade1b0b94
diff --git a/lib/python/plasma.py b/lib/python/plasma.py
index 347715118..8200471d5 100644
--- a/lib/python/plasma.py
+++ b/lib/python/plasma.py
@@ -42,7 +42,7 @@ class PlasmaClient(object):
plasma_client_library = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../../build/plasma_client.so")
self.client = ctypes.cdll.LoadLibrary(plasma_client_library)
- self.client.plasma_store_connect.restype = ctypes.c_void_p
+ self.client.plasma_connect.restype = ctypes.c_void_p
self.client.plasma_create.restype = None
self.client.plasma_get.restype = None
self.client.plasma_contains.restype = None
@@ -58,12 +58,12 @@ class PlasmaClient(object):
self.buffer_from_read_write_memory.argtypes = [ctypes.c_void_p, ctypes.c_int64]
self.buffer_from_read_write_memory.restype = ctypes.py_object
- self.store_conn = ctypes.c_void_p(self.client.plasma_store_connect(socket_name))
-
if addr is not None and port is not None:
- self.manager_conn = self.client.plasma_manager_connect(addr, port)
+ self.has_manager_conn = True
+ self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(socket_name, addr, port))
else:
- self.manager_conn = -1 # not connected
+ self.has_manager_conn = False
+ self.plasma_conn = ctypes.c_void_p(self.client.plasma_connect(socket_name, None, 0))
def create(self, object_id, size, metadata=None):
"""Create a new buffer in the PlasmaStore for a particular object ID.
@@ -81,7 +81,7 @@ class PlasmaClient(object):
# Turn the metadata into the right type.
metadata = buffer("") if metadata is None else metadata
metadata = (ctypes.c_ubyte * len(metadata)).from_buffer_copy(metadata)
- self.client.plasma_create(self.store_conn, make_plasma_id(object_id), size, ctypes.cast(metadata, ctypes.POINTER(ctypes.c_ubyte * len(metadata))), len(metadata), ctypes.byref(data))
+ self.client.plasma_create(self.plasma_conn, make_plasma_id(object_id), size, ctypes.cast(metadata, ctypes.POINTER(ctypes.c_ubyte * len(metadata))), len(metadata), ctypes.byref(data))
return self.buffer_from_read_write_memory(data, size)
def get(self, object_id):
@@ -97,7 +97,7 @@ class PlasmaClient(object):
data = ctypes.c_void_p()
metadata_size = ctypes.c_int64()
metadata = ctypes.c_void_p()
- buf = self.client.plasma_get(self.store_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata))
+ buf = self.client.plasma_get(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata))
return self.buffer_from_memory(data, size)
def get_metadata(self, object_id):
@@ -113,7 +113,7 @@ class PlasmaClient(object):
data = ctypes.c_void_p()
metadata_size = ctypes.c_int64()
metadata = ctypes.c_void_p()
- buf = self.client.plasma_get(self.store_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata))
+ buf = self.client.plasma_get(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(size), ctypes.byref(data), ctypes.byref(metadata_size), ctypes.byref(metadata))
return self.buffer_from_memory(metadata, metadata_size)
def contains(self, object_id):
@@ -123,7 +123,7 @@ class PlasmaClient(object):
object_id (str): A string used to identify an object.
"""
has_object = ctypes.c_int()
- self.client.plasma_contains(self.store_conn, make_plasma_id(object_id), ctypes.byref(has_object))
+ self.client.plasma_contains(self.plasma_conn, make_plasma_id(object_id), ctypes.byref(has_object))
has_object = has_object.value
if has_object == 1:
return True
@@ -141,7 +141,7 @@ class PlasmaClient(object):
Args:
object_id (str): A string used to identify an object.
"""
- self.client.plasma_seal(self.store_conn, make_plasma_id(object_id))
+ self.client.plasma_seal(self.plasma_conn, make_plasma_id(object_id))
def delete(self, object_id):
"""Delete the buffer in the PlasmaStore for a particular object ID.
@@ -151,7 +151,7 @@ class PlasmaClient(object):
Args:
object_id (str): A string used to identify an object.
"""
- self.client.plasma_delete(self.store_conn, make_plasma_id(object_id))
+ self.client.plasma_delete(self.plasma_conn, make_plasma_id(object_id))
def transfer(self, addr, port, object_id):
"""Transfer local object with id object_id to another plasma instance
@@ -161,13 +161,31 @@ class PlasmaClient(object):
port (int): Port number of the plasma instance the object is sent to.
object_id (str): A string used to identify an object.
"""
- if self.manager_conn == -1:
+ if not self.has_manager_conn:
raise Exception("Not connected to the plasma manager socket")
- self.client.plasma_transfer(self.manager_conn, addr, port, make_plasma_id(object_id))
+ self.client.plasma_transfer(self.plasma_conn, addr, port, make_plasma_id(object_id))
+
+ def fetch(self, object_ids):
+ """Fetch the object with id object_id from another plasma manager instance.
+
+ Args:
+ object_id (str): A string used to identify an object.
+ """
+ object_id_array = (len(object_ids) * PlasmaID)()
+ for i, object_id in enumerate(object_ids):
+ object_id_array[i] = make_plasma_id(object_id)
+ success_array = (len(object_ids) * ctypes.c_int)()
+ if not self.has_manager_conn:
+ raise Exception("Not connected to the plasma manager socket")
+ self.client.plasma_fetch(self.plasma_conn,
+ object_id_array._length_,
+ object_id_array,
+ success_array);
+ return [bool(success) for success in success_array]
def subscribe(self):
"""Subscribe to notifications about sealed objects."""
- fd = self.client.plasma_subscribe(self.store_conn)
+ fd = self.client.plasma_subscribe(self.plasma_conn)
self.notification_sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
# Make the socket non-blocking.
self.notification_sock.setblocking(0)
diff --git a/src/example.c b/src/example.c
index 81763dac7..922d10583 100644
--- a/src/example.c
+++ b/src/example.c
@@ -16,7 +16,7 @@
#include "plasma_client.h"
int main(int argc, char *argv[]) {
- plasma_store_conn *conn = NULL;
+ plasma_connection *conn = NULL;
int64_t size;
uint8_t *data;
int c;
@@ -25,7 +25,7 @@ int main(int argc, char *argv[]) {
while ((c = getopt(argc, argv, "s:cfg")) != -1) {
switch (c) {
case 's':
- conn = plasma_store_connect(optarg);
+ conn = plasma_connect(optarg, NULL, 0);
break;
case 'c':
assert(conn != NULL);
@@ -43,5 +43,5 @@ int main(int argc, char *argv[]) {
}
}
assert(conn != NULL);
- plasma_store_disconnect(conn);
+ plasma_disconnect(conn);
}
diff --git a/src/plasma.h b/src/plasma.h
index 8b6852f65..35e4f0db3 100644
--- a/src/plasma.h
+++ b/src/plasma.h
@@ -58,11 +58,11 @@ enum plasma_message_type {
PLASMA_TRANSFER,
/** Header for sending data. */
PLASMA_DATA,
+ /** Request a fetch of an object in another store. */
+ PLASMA_FETCH,
};
typedef struct {
- /** The ID of the object that the request is about. */
- object_id object_id;
/** The size of the object's data. */
int64_t data_size;
/** The size of the object's metadata. */
@@ -73,13 +73,21 @@ typedef struct {
/** In a transfer request, this is the port of the Plasma Manager to transfer
* the object to. */
int port;
+ /** The number of object IDs that will be included in this request. */
+ int num_object_ids;
+ /** The IDs of the objects that the request is about. */
+ object_id object_ids[1];
} plasma_request;
typedef struct {
+ /** The object ID that this reply refers to. */
+ object_id object_id;
/** The object that is returned with this reply. */
plasma_object object;
- /** This is used only to respond to requests of type PLASMA_CONTAINS. It is 1
- * if the object is present and 0 otherwise. Used for plasma_contains. */
+ /** This is used only to respond to requests of type
+ * PLASMA_CONTAINS or PLASMA_FETCH. It is 1 if the object is
+ * present and 0 otherwise. Used for plasma_contains and
+ * plasma_fetch. */
int has_object;
} plasma_reply;
diff --git a/src/plasma_client.c b/src/plasma_client.c
index 1cae73607..b01204a1c 100644
--- a/src/plasma_client.c
+++ b/src/plasma_client.c
@@ -32,22 +32,45 @@ typedef struct {
/** Information about a connection between a Plasma Client and Plasma Store.
* This is used to avoid mapping the same files into memory multiple times. */
-struct plasma_store_conn {
+struct plasma_connection {
/** File descriptor of the Unix domain socket that connects to the store. */
- int conn;
+ int store_conn;
+ /** File descriptor of the Unix domain socket that connects to the manager. */
+ int manager_conn;
/** Table of dlmalloc buffer files that have been memory mapped so far. */
client_mmap_table_entry *mmap_table;
};
+int plasma_request_size(int num_object_ids) {
+ int object_ids_size = (num_object_ids - 1) * sizeof(object_id);
+ return sizeof(plasma_request) + object_ids_size;
+}
+
void plasma_send_request(int fd, int type, plasma_request *req) {
- int req_count = sizeof(plasma_request);
- write_message(fd, type, req_count, (uint8_t *) req);
+ int req_size = plasma_request_size(req->num_object_ids);
+ int error = write_message(fd, type, req_size, (uint8_t *) req);
+ /* TODO(swang): Actually handle the write error. */
+ CHECK(!error);
+}
+
+plasma_request make_plasma_request(object_id object_id) {
+ plasma_request req = {.num_object_ids = 1, .object_ids = {object_id}};
+ return req;
+}
+
+plasma_request *make_plasma_multiple_request(int num_object_ids,
+ object_id object_ids[]) {
+ int req_size = plasma_request_size(num_object_ids);
+ plasma_request *req = malloc(req_size);
+ req->num_object_ids = num_object_ids;
+ memcpy(&req->object_ids, object_ids, num_object_ids * sizeof(object_id));
+ return req;
}
/* If the file descriptor fd has been mmapped in this client process before,
* return the pointer that was returned by mmap, otherwise mmap it and store the
* pointer in a hash table. */
-uint8_t *lookup_or_mmap(plasma_store_conn *conn,
+uint8_t *lookup_or_mmap(plasma_connection *conn,
int fd,
int store_fd_val,
int64_t map_size) {
@@ -72,7 +95,7 @@ uint8_t *lookup_or_mmap(plasma_store_conn *conn,
}
}
-void plasma_create(plasma_store_conn *conn,
+void plasma_create(plasma_connection *conn,
object_id object_id,
int64_t data_size,
uint8_t *metadata,
@@ -81,13 +104,13 @@ void plasma_create(plasma_store_conn *conn,
LOG_DEBUG("called plasma_create on conn %d with size %" PRId64
" and metadata size "
"%" PRId64,
- conn->conn, data_size, metadata_size);
- plasma_request req = {.object_id = object_id,
- .data_size = data_size,
- .metadata_size = metadata_size};
- plasma_send_request(conn->conn, PLASMA_CREATE, &req);
+ conn->store_conn, data_size, metadata_size);
+ plasma_request req = make_plasma_request(object_id);
+ req.data_size = data_size;
+ req.metadata_size = metadata_size;
+ plasma_send_request(conn->store_conn, PLASMA_CREATE, &req);
plasma_reply reply;
- int fd = recv_fd(conn->conn, (char *) &reply, sizeof(plasma_reply));
+ int fd = recv_fd(conn->store_conn, (char *) &reply, sizeof(plasma_reply));
plasma_object *object = &reply.object;
CHECK(object->data_size == data_size);
CHECK(object->metadata_size == metadata_size);
@@ -106,16 +129,16 @@ void plasma_create(plasma_store_conn *conn,
}
/* This method is used to get both the data and the metadata. */
-void plasma_get(plasma_store_conn *conn,
+void plasma_get(plasma_connection *conn,
object_id object_id,
int64_t *size,
uint8_t **data,
int64_t *metadata_size,
uint8_t **metadata) {
- plasma_request req = {.object_id = object_id};
- plasma_send_request(conn->conn, PLASMA_GET, &req);
+ plasma_request req = make_plasma_request(object_id);
+ plasma_send_request(conn->store_conn, PLASMA_GET, &req);
plasma_reply reply;
- int fd = recv_fd(conn->conn, (char *) &reply, sizeof(plasma_reply));
+ int fd = recv_fd(conn->store_conn, (char *) &reply, sizeof(plasma_reply));
CHECKM(fd != -1, "recv not successful");
plasma_object *object = &reply.object;
*data = lookup_or_mmap(conn, fd, object->handle.store_fd,
@@ -130,29 +153,32 @@ void plasma_get(plasma_store_conn *conn,
}
/* This method is used to query whether the plasma store contains an object. */
-void plasma_contains(plasma_store_conn *conn,
+void plasma_contains(plasma_connection *conn,
object_id object_id,
int *has_object) {
- plasma_request req = {.object_id = object_id};
- plasma_send_request(conn->conn, PLASMA_CONTAINS, &req);
+ plasma_request req = make_plasma_request(object_id);
+ plasma_send_request(conn->store_conn, PLASMA_CONTAINS, &req);
plasma_reply reply;
- int r = read(conn->conn, &reply, sizeof(plasma_reply));
+ int r = read(conn->store_conn, &reply, sizeof(plasma_reply));
CHECKM(r != -1, "read error");
CHECKM(r != 0, "connection disconnected");
*has_object = reply.has_object;
}
-void plasma_seal(plasma_store_conn *conn, object_id object_id) {
- plasma_request req = {.object_id = object_id};
- plasma_send_request(conn->conn, PLASMA_SEAL, &req);
+void plasma_seal(plasma_connection *conn, object_id object_id) {
+ plasma_request req = make_plasma_request(object_id);
+ plasma_send_request(conn->store_conn, PLASMA_SEAL, &req);
+ if (conn->manager_conn >= 0) {
+ plasma_send_request(conn->manager_conn, PLASMA_SEAL, &req);
+ }
}
-void plasma_delete(plasma_store_conn *conn, object_id object_id) {
- plasma_request req = {.object_id = object_id};
- plasma_send_request(conn->conn, PLASMA_DELETE, &req);
+void plasma_delete(plasma_connection *conn, object_id object_id) {
+ plasma_request req = make_plasma_request(object_id);
+ plasma_send_request(conn->store_conn, PLASMA_DELETE, &req);
}
-int plasma_subscribe(plasma_store_conn *conn) {
+int plasma_subscribe(plasma_connection *conn) {
int fd[2];
/* Create a non-blocking socket pair. This will only be used to send
* notifications from the Plasma store to the client. */
@@ -162,25 +188,27 @@ int plasma_subscribe(plasma_store_conn *conn) {
CHECK(fcntl(fd[1], F_SETFL, flags | O_NONBLOCK) == 0);
/* Tell the Plasma store about the subscription. */
plasma_request req = {};
- plasma_send_request(conn->conn, PLASMA_SUBSCRIBE, &req);
+ plasma_send_request(conn->store_conn, PLASMA_SUBSCRIBE, &req);
/* Send the file descriptor that the Plasma store should use to push
* notifications about sealed objects to this client. We include a one byte
* message because otherwise it seems to hang on Linux. */
char dummy = '\0';
- send_fd(conn->conn, fd[1], &dummy, 1);
+ send_fd(conn->store_conn, fd[1], &dummy, 1);
/* Return the file descriptor that the client should use to read notifications
* about sealed objects. */
return fd[0];
}
-plasma_store_conn *plasma_store_connect(const char *socket_name) {
- assert(socket_name);
+plasma_connection *plasma_connect(const char *store_socket_name,
+ const char *manager_addr,
+ int manager_port) {
+ CHECK(store_socket_name);
/* Try to connect to the Plasma store. If unsuccessful, retry several times.
*/
int fd = -1;
int connected_successfully = 0;
for (int num_attempts = 0; num_attempts < 50; ++num_attempts) {
- fd = connect_ipc_sock(socket_name);
+ fd = connect_ipc_sock(store_socket_name);
if (fd >= 0) {
connected_successfully = 1;
break;
@@ -190,23 +218,32 @@ plasma_store_conn *plasma_store_connect(const char *socket_name) {
}
/* If we could not connect to the Plasma store, exit. */
if (!connected_successfully) {
- LOG_ERR("could not connect to store %s", socket_name);
+ LOG_ERR("could not connect to store %s", store_socket_name);
exit(-1);
}
/* Initialize the store connection struct */
- plasma_store_conn *result = malloc(sizeof(plasma_store_conn));
- result->conn = fd;
+ plasma_connection *result = malloc(sizeof(plasma_connection));
+ result->store_conn = fd;
+ if (manager_addr != NULL) {
+ result->manager_conn = plasma_manager_connect(manager_addr, manager_port);
+ } else {
+ result->manager_conn = -1;
+ }
result->mmap_table = NULL;
return result;
}
-void plasma_store_disconnect(plasma_store_conn *conn) {
- close(conn->conn);
+void plasma_disconnect(plasma_connection *conn) {
+ close(conn->store_conn);
+ if (conn->manager_conn >= 0) {
+ close(conn->manager_conn);
+ }
free(conn);
}
#define h_addr h_addr_list[0]
+/* TODO(swang): Return the error to the caller. */
int plasma_manager_connect(const char *ip_addr, int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
@@ -236,16 +273,57 @@ int plasma_manager_connect(const char *ip_addr, int port) {
return fd;
}
-void plasma_transfer(int manager,
+void plasma_transfer(plasma_connection *conn,
const char *addr,
int port,
object_id object_id) {
- plasma_request req = {.object_id = object_id, .port = port};
+ plasma_request req = make_plasma_request(object_id);
+ req.port = port;
char *end = NULL;
for (int i = 0; i < 4; ++i) {
req.addr[i] = strtol(end ? end : addr, &end, 10);
/* skip the '.' */
end += 1;
}
- plasma_send_request(manager, PLASMA_TRANSFER, &req);
+ plasma_send_request(conn->manager_conn, PLASMA_TRANSFER, &req);
+}
+
+void plasma_fetch(plasma_connection *conn,
+ int num_object_ids,
+ object_id object_ids[],
+ int is_fetched[]) {
+ CHECK(conn->manager_conn >= 0);
+ plasma_request *req =
+ make_plasma_multiple_request(num_object_ids, object_ids);
+ LOG_DEBUG("Requesting fetch");
+ plasma_send_request(conn->manager_conn, PLASMA_FETCH, req);
+ free(req);
+
+ plasma_reply reply;
+ int nbytes, success;
+ for (int received = 0; received < num_object_ids; ++received) {
+ nbytes = recv(conn->manager_conn, (uint8_t *) &reply, sizeof(reply),
+ MSG_WAITALL);
+ if (nbytes < 0) {
+ LOG_ERR("Error while waiting for manager response in fetch");
+ success = 0;
+ } else if (nbytes == 0) {
+ success = 0;
+ } else {
+ CHECK(nbytes == sizeof(reply));
+ success = reply.has_object;
+ }
+ /* Update the correct index in is_fetched. */
+ int i = 0;
+ for (; i < num_object_ids; i++) {
+ if (memcmp(&object_ids[i], &reply.object_id, sizeof(object_id)) == 0) {
+ /* Check that this isn't a duplicate response. */
+ CHECK(!is_fetched[i]);
+ is_fetched[i] = success;
+ break;
+ }
+ }
+ CHECKM(i != num_object_ids,
+ "Received unexpected object ID from manager during fetch.");
+ }
}
diff --git a/src/plasma_client.h b/src/plasma_client.h
index 36ecb1061..9468397cc 100644
--- a/src/plasma_client.h
+++ b/src/plasma_client.h
@@ -3,7 +3,7 @@
#include "plasma.h"
-typedef struct plasma_store_conn plasma_store_conn;
+typedef struct plasma_connection plasma_connection;
/**
* This is used by the Plasma Client to send a request to the Plasma Store or
@@ -14,25 +14,52 @@ typedef struct plasma_store_conn plasma_store_conn;
* @param req The address of the request to send.
* @return Void.
*/
-void plasma_send_request(int conn, int type, plasma_request *req);
+void plasma_send_request(int fd, int type, plasma_request *req);
/**
- * Connect to the local plasma store UNIX domain socket with path socket_name
- * and return the resulting connection.
+ * Create a plasma request to be sent with a single object ID.
*
- * @param socket_name The name of the socket to use to connect to the Plasma
- * Store.
+ * @param object_id The object ID to include in the request.
+ * @return The plasma request.
+ */
+plasma_request make_plasma_request(object_id object_id);
+
+/**
+ * Create a plasma request to be sent with multiple object ID. Caller must free
+ * the returned plasma request pointer.
+ *
+ * @param num_object_ids The number of object IDs to include in the request.
+ * @param object_ids The array of object IDs to include in the request. It must
+ * have length at least equal to num_object_ids.
+ * @return A pointer to the newly created plasma request.
+ */
+plasma_request *make_plasma_multiple_request(int num_object_ids,
+ object_id object_ids[]);
+
+/**
+ * Connect to the local plasma store and plasma manager. Return
+ * the resulting connection.
+ *
+ * @param socket_name The name of the UNIX domain socket to use
+ * to connect to the Plasma Store.
+ * @param manager_addr The IP address of the plasma manager to
+ * connect to.
+ * @param manager_addr The port of the plasma manager to connect
+ * to.
* @return The object containing the connection state.
*/
-plasma_store_conn *plasma_store_connect(const char *socket_name);
+plasma_connection *plasma_connect(const char *store_socket_name,
+ const char *manager_addr,
+ int manager_port);
/**
- * Disconnect from the local plasma store.
+ * Disconnect from the local plasma instance, including the local store and
+ * manager.
*
- * @param conn The connection to the local plasma store.
+ * @param conn The connection to the local plasma store and plasma manager.
* @return Void.
*/
-void plasma_store_disconnect(plasma_store_conn *conn);
+void plasma_disconnect(plasma_connection *conn);
/**
* Connect to a possibly remote Plasma Manager.
@@ -58,7 +85,7 @@ int plasma_manager_connect(const char *addr, int port);
* @param data The address of the newly created object will be written here.
* @return Void.
*/
-void plasma_create(plasma_store_conn *conn,
+void plasma_create(plasma_connection *conn,
object_id object_id,
int64_t size,
uint8_t *metadata,
@@ -80,7 +107,7 @@ void plasma_create(plasma_store_conn *conn,
* address.
* @return Void.
*/
-void plasma_get(plasma_store_conn *conn,
+void plasma_get(plasma_connection *conn,
object_id object_id,
int64_t *size,
uint8_t **data,
@@ -99,7 +126,7 @@ void plasma_get(plasma_store_conn *conn,
* present and 0 if it is not present.
* @return Void.
*/
-void plasma_contains(plasma_store_conn *conn,
+void plasma_contains(plasma_connection *conn,
object_id object_id,
int *has_object);
@@ -111,7 +138,7 @@ void plasma_contains(plasma_store_conn *conn,
* @param object_id The ID of the object to seal.
* @return Void.
*/
-void plasma_seal(plasma_store_conn *conn, object_id object_id);
+void plasma_seal(plasma_connection *conn, object_id object_id);
/**
* Delete an object from the object store. This currently assumes that the
@@ -124,7 +151,26 @@ void plasma_seal(plasma_store_conn *conn, object_id object_id);
* @param object_id The ID of the object to delete.
* @return Void.
*/
-void plasma_delete(plasma_store_conn *conn, object_id object_id);
+void plasma_delete(plasma_connection *conn, object_id object_id);
+
+/**
+ * Fetch objects from remote plasma stores that have the
+ * objects stored.
+ *
+ * @param manager A file descriptor for the socket connection
+ * to the local manager.
+ * @param object_id_count The number of object IDs requested.
+ * @param object_ids[] The vector of object IDs requested. Length must be at
+ * least num_object_ids.
+ * @param is_fetched[] The vector in which to return the success
+ * of each object's fetch operation, in the same order as
+ * object_ids. Length must be at least num_object_ids.
+ * @return Void.
+ */
+void plasma_fetch(plasma_connection *conn,
+ int num_object_ids,
+ object_id object_ids[],
+ int is_fetched[]);
/**
* Subscribe to notifications when objects are sealed in the object store.
@@ -135,6 +181,6 @@ void plasma_delete(plasma_store_conn *conn, object_id object_id);
* @return The file descriptor that the client should use to read notifications
from the object store about sealed objects.
*/
-int plasma_subscribe(plasma_store_conn *conn);
+int plasma_subscribe(plasma_connection *conn);
#endif
diff --git a/src/plasma_manager.c b/src/plasma_manager.c
index c7bdc7627..3dfe21735 100644
--- a/src/plasma_manager.c
+++ b/src/plasma_manager.c
@@ -23,6 +23,7 @@
#include "uthash.h"
#include "utlist.h"
+#include "utarray.h"
#include "utstring.h"
#include "common.h"
#include "io.h"
@@ -30,56 +31,255 @@
#include "plasma.h"
#include "plasma_client.h"
#include "plasma_manager.h"
+#include "state/db.h"
+#include "state/object_table.h"
+
+#define NUM_RETRIES 5
+
+/* Timeouts are in milliseconds. */
+#ifndef RAY_TIMEOUT
+#define MANAGER_TIMEOUT 1000
+#else
+#define MANAGER_TIMEOUT RAY_TIMEOUT
+#endif
+
+typedef struct client_object_connection client_object_connection;
typedef struct {
+ /** Event loop. */
+ event_loop *loop;
/** Connection to the local plasma store for reading or writing data. */
- plasma_store_conn *store_conn;
- /** Hash table of all contexts for active connections to other plasma
- * managers. These are used for writing data to other plasma stores. */
+ plasma_connection *plasma_conn;
+ /** Hash table of all contexts for active connections to
+ * other plasma managers. These are used for writing data to
+ * other plasma stores. */
client_connection *manager_connections;
+ db_handle *db;
+ /** Our address. */
+ uint8_t addr[4];
+ /** Our port. */
+ int port;
+ /** Hash table of outstanding fetch requests. The key is
+ * object id, value is a list of connections to the clients
+ * who are blocking on a fetch of this object. */
+ client_object_connection *fetch_connections;
} plasma_manager_state;
-typedef struct plasma_buffer plasma_buffer;
+plasma_manager_state *g_manager_state = NULL;
-/* Buffer for reading and writing data between plasma managers. */
-struct plasma_buffer {
+typedef struct plasma_request_buffer plasma_request_buffer;
+
+/* Buffer for requests between plasma managers. */
+struct plasma_request_buffer {
+ int type;
object_id object_id;
uint8_t *data;
int64_t data_size;
uint8_t *metadata;
int64_t metadata_size;
- int writable;
/* Pointer to the next buffer that we will write to this plasma manager. This
- * field is only used if we're transferring data to another plasma manager,
+ * field is only used if we're pushing requests to another plasma manager,
* not if we are receiving data. */
- plasma_buffer *next;
+ plasma_request_buffer *next;
+};
+
+/* The context for fetch and wait requests. These are per client, per object. */
+struct client_object_connection {
+ /** The ID of the object we are fetching or waiting for. */
+ object_id object_id;
+ /** The client connection context, shared between other
+ * client_object_connections for the same client. */
+ client_connection *client_conn;
+ /** The ID for the timer that will time out the current request to the state
+ * database or another plasma manager. */
+ int64_t timer;
+ /** How many retries we have left for the request. Decremented on every
+ * timeout. */
+ int num_retries;
+ /** Handle for a linked list. */
+ client_object_connection *next;
+ /** Pointer to the array containing the manager locations of
+ * this object. */
+ char **manager_vector;
+ /** The number of manager locations in the array manager_vector. */
+ int manager_count;
+ /** Handle for the uthash table in the client connection
+ * context that keeps track of active object connection
+ * contexts. */
+ UT_hash_handle active_hh;
+ /** Handle for the uthash table in the manager state that
+ * keeps track of outstanding fetch requests. */
+ UT_hash_handle fetch_hh;
};
/* Context for a client connection to another plasma manager. */
struct client_connection {
- /* Current state for this plasma manager. This is shared between all client
- * connections to the plasma manager. */
+ /** Current state for this plasma manager. This is shared
+ * between all client connections to the plasma manager. */
plasma_manager_state *manager_state;
- /* Current position in the buffer. */
+ /** Current position in the buffer. */
int64_t cursor;
- /* Buffer that this connection is reading from. If this is a connection to
- * write data to another plasma store, then it is a linked list of buffers to
- * write. */
- plasma_buffer *transfer_queue;
- /* File descriptor for the socket connected to the other plasma manager. */
+ /** Buffer that this connection is reading from. If this is a connection to
+ * write data to another plasma store, then it is a linked
+ * list of buffers to write. */
+ /* TODO(swang): Split into two queues, data transfers and data requests. */
+ plasma_request_buffer *transfer_queue;
+ /** File descriptor for the socket connected to the other
+ * plasma manager. */
int fd;
- /* Following fields are used only for connections to plasma managers. */
- /* Key that uniquely identifies the plasma manager that we're connected to.
- * We will use the string
: as an identifier. */
+ /** The objects that we are waiting for and their callback
+ * contexts, for either a fetch or a wait operation. */
+ client_object_connection *active_objects;
+ /** The number of objects that we have left to return for
+ * this fetch or wait operation. */
+ int num_return_objects;
+ /** Fields specific to connections to plasma managers. Key that uniquely
+ * identifies the plasma manager that we're connected to. We will use the
+ * string : as an identifier. */
char *ip_addr_port;
/** Handle for the uthash table. */
UT_hash_handle hh;
};
-plasma_manager_state *init_plasma_manager_state(const char *store_socket_name) {
+void free_client_object_connection(client_object_connection *object_conn) {
+ for (int i = 0; i < object_conn->manager_count; ++i) {
+ free(object_conn->manager_vector[i]);
+ }
+ free(object_conn->manager_vector);
+ free(object_conn);
+}
+
+int send_client_reply(client_connection *conn, plasma_reply *reply) {
+ conn->num_return_objects--;
+ CHECK(conn->num_return_objects >= 0);
+ /* TODO(swang): Handle errors in write. */
+ int n = write(conn->fd, (uint8_t *) reply, sizeof(plasma_reply));
+ return (n != sizeof(plasma_reply));
+}
+
+/**
+ * Get the context for the given object ID for the given client
+ * connection, if there is one active.
+ *
+ * @param client_conn The client connection context.
+ * @param object_id The object ID whose context we want.
+ * @return A pointer to the active object context, or NULL if
+ * there isn't one.
+ */
+client_object_connection *get_object_connection(client_connection *client_conn,
+ object_id object_id) {
+ client_object_connection *object_conn;
+ HASH_FIND(active_hh, client_conn->active_objects, &object_id,
+ sizeof(object_id), object_conn);
+ return object_conn;
+}
+
+/**
+ * Create a new context for the given object ID with the given
+ * client connection and register it with the manager's
+ * outstanding fetch or wait requests and the client
+ * connection's active object contexts.
+ *
+ * @param client_conn The client connection context.
+ * @param object_id The object ID whose context we want to
+ * create.
+ * @return A pointer to the newly created object context.
+ */
+client_object_connection *add_object_connection(client_connection *client_conn,
+ object_id object_id) {
+ /* TODO(swang): Support registration of wait operations. */
+ /* Create a new context for this client connection and object. */
+ client_object_connection *object_conn =
+ malloc(sizeof(client_object_connection));
+ if (!object_conn) {
+ return NULL;
+ }
+ object_conn->object_id = object_id;
+ object_conn->client_conn = client_conn;
+ object_conn->manager_count = 0;
+ object_conn->manager_vector = NULL;
+ /* Register the object context with the client context. */
+ HASH_ADD(active_hh, client_conn->active_objects, object_id, sizeof(object_id),
+ object_conn);
+ /* Register the object context with the manager state. */
+ client_object_connection *fetch_connections;
+ HASH_FIND(fetch_hh, client_conn->manager_state->fetch_connections, &object_id,
+ sizeof(object_id), fetch_connections);
+ LOG_DEBUG("Registering fd %d for fetch.", client_conn->fd);
+ if (!fetch_connections) {
+ fetch_connections = NULL;
+ LL_APPEND(fetch_connections, object_conn);
+ HASH_ADD(fetch_hh, client_conn->manager_state->fetch_connections, object_id,
+ sizeof(object_id), fetch_connections);
+ } else {
+ LL_APPEND(fetch_connections, object_conn);
+ }
+ return object_conn;
+}
+
+/**
+ * Clean up and free an active object context. Deregister it from the
+ * associated client connection and from the manager state.
+ *
+ * @param client_conn The client connection context.
+ * @param object_id The object ID whose context we want to delete.
+ */
+void remove_object_connection(client_connection *client_conn,
+ client_object_connection *object_conn) {
+ /* Deregister the object context with the client context. */
+ HASH_DELETE(active_hh, client_conn->active_objects, object_conn);
+ /* Deregister the object context with the manager state. */
+ client_object_connection *object_conns;
+ HASH_FIND(fetch_hh, client_conn->manager_state->fetch_connections,
+ &(object_conn->object_id), sizeof(object_conn->object_id),
+ object_conns);
+ CHECK(object_conns);
+ int len;
+ client_object_connection *tmp;
+ LL_COUNT(object_conns, tmp, len);
+ if (len == 1) {
+ HASH_DELETE(fetch_hh, client_conn->manager_state->fetch_connections,
+ object_conns);
+ }
+ LL_DELETE(object_conns, object_conn);
+ /* Free the object. */
+ free_client_object_connection(object_conn);
+}
+
+/* Helper function to parse a string of the form : into the
+ * given ip_addr and port pointers. The ip_addr buffer must already be
+ * allocated. */
+/* TODO(swang): Move this function to Ray common. */
+void parse_ip_addr_port(const char *ip_addr_port, char *ip_addr, int *port) {
+ char port_str[6];
+ int parsed = sscanf(ip_addr_port, "%15[0-9.]:%5[0-9]", ip_addr, port_str);
+ CHECK(parsed == 2);
+ *port = atoi(port_str);
+}
+
+plasma_manager_state *init_plasma_manager_state(const char *store_socket_name,
+ const char *manager_addr,
+ int manager_port,
+ const char *db_addr,
+ int db_port) {
plasma_manager_state *state = malloc(sizeof(plasma_manager_state));
- state->store_conn = plasma_store_connect(store_socket_name);
+ state->loop = event_loop_create();
+ state->plasma_conn = plasma_connect(store_socket_name, NULL, 0);
state->manager_connections = NULL;
+ state->fetch_connections = NULL;
+ if (db_addr) {
+ state->db = db_connect(db_addr, db_port, "plasma_manager", manager_addr,
+ manager_port);
+ db_attach(state->db, state->loop);
+ LOG_DEBUG("Connected to db at %s:%d, assigned client ID %d", db_addr,
+ db_port, get_client_id(state->db));
+ } else {
+ state->db = NULL;
+ LOG_DEBUG("No db connection specified");
+ }
+ sscanf(manager_addr, "%hhu.%hhu.%hhu.%hhu", &state->addr[0], &state->addr[1],
+ &state->addr[2], &state->addr[3]);
+ state->port = manager_port;
return state;
}
@@ -90,31 +290,8 @@ void process_message(event_loop *loop,
void *context,
int events);
-void write_object_chunk(event_loop *loop,
- int data_sock,
- void *context,
- int events) {
- client_connection *conn = (client_connection *) context;
- if (conn->transfer_queue == NULL) {
- /* If there are no objects to transfer, temporarily remove this connection
- * from the event loop. It will be reawoken when we receive another
- * PLASMA_TRANSFER request. */
- event_loop_remove_file(loop, conn->fd);
- return;
- }
-
- LOG_DEBUG("Writing data");
+void write_object_chunk(client_connection *conn, plasma_request_buffer *buf) {
ssize_t r, s;
- plasma_buffer *buf = conn->transfer_queue;
- if (conn->cursor == 0) {
- /* If the cursor is zero, we haven't sent any requests for this object yet,
- * so send the initial PLASMA_DATA request. */
- plasma_request manager_req = {.object_id = buf->object_id,
- .data_size = buf->data_size,
- .metadata_size = buf->metadata_size};
- plasma_send_request(conn->fd, PLASMA_DATA, &manager_req);
- }
-
/* Try to write one BUFSIZE at a time. */
s = buf->data_size + buf->metadata_size - conn->cursor;
if (s > BUFSIZE)
@@ -132,23 +309,67 @@ void write_object_chunk(event_loop *loop,
conn->cursor += r;
}
if (r == 0) {
- /* If we've finished writing this buffer, move on to the next transfer
- * request and reset the cursor to zero. */
- LOG_DEBUG("writing on channel %d finished", data_sock);
+ /* If we've finished writing this buffer, reset the cursor to zero. */
+ LOG_DEBUG("writing on channel %d finished", conn->fd);
conn->cursor = 0;
+ }
+}
+
+void send_queued_request(event_loop *loop,
+ int data_sock,
+ void *context,
+ int events) {
+ client_connection *conn = (client_connection *) context;
+ if (conn->transfer_queue == NULL) {
+ /* If there are no objects to transfer, temporarily remove this connection
+ * from the event loop. It will be reawoken when we receive another
+ * PLASMA_TRANSFER request. */
+ event_loop_remove_file(loop, conn->fd);
+ return;
+ }
+
+ plasma_request_buffer *buf = conn->transfer_queue;
+ plasma_request manager_req = make_plasma_request(buf->object_id);
+ switch (buf->type) {
+ case PLASMA_TRANSFER:
+ LOG_DEBUG("Requesting transfer on DB client %d",
+ get_client_id(conn->manager_state->db));
+ memcpy(manager_req.addr, conn->manager_state->addr,
+ sizeof(manager_req.addr));
+ manager_req.port = conn->manager_state->port;
+ plasma_send_request(conn->fd, buf->type, &manager_req);
+ break;
+ case PLASMA_DATA:
+ LOG_DEBUG("Transferring object to manager");
+ if (conn->cursor == 0) {
+ /* If the cursor is zero, we haven't sent any requests for this object
+ * yet,
+ * so send the initial PLASMA_DATA request. */
+ manager_req.data_size = buf->data_size;
+ manager_req.metadata_size = buf->metadata_size;
+ plasma_send_request(conn->fd, PLASMA_DATA, &manager_req);
+ }
+ write_object_chunk(conn, buf);
+ break;
+ default:
+ LOG_ERR("Buffered request has unknown type.");
+ }
+
+ /* We are done sending this request. */
+ if (conn->cursor == 0) {
LL_DELETE(conn->transfer_queue, buf);
free(buf);
}
}
-void read_object_chunk(event_loop *loop,
- int data_sock,
- void *context,
- int events) {
+void process_data_chunk(event_loop *loop,
+ int data_sock,
+ void *context,
+ int events) {
LOG_DEBUG("Reading data");
ssize_t r, s;
client_connection *conn = (client_connection *) context;
- plasma_buffer *buf = conn->transfer_queue;
+ plasma_request_buffer *buf = conn->transfer_queue;
CHECK(buf != NULL);
/* Try to read one BUFSIZE at a time. */
s = buf->data_size + buf->metadata_size - conn->cursor;
@@ -164,90 +385,120 @@ void read_object_chunk(event_loop *loop,
} else {
conn->cursor += r;
}
- if (conn->cursor == buf->data_size + buf->metadata_size) {
- LOG_DEBUG("reading on channel %d finished", data_sock);
- plasma_seal(conn->manager_state->store_conn, buf->object_id);
- LL_DELETE(conn->transfer_queue, buf);
- free(buf);
- /* Switch to listening for requests from this socket, instead of reading
- * data. */
- event_loop_remove_file(loop, data_sock);
- event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message,
- conn);
+
+ if (conn->cursor != buf->data_size + buf->metadata_size) {
+ /* If we haven't finished reading all the data for this object yet, we're
+ * done for now. */
+ return;
}
- return;
+
+ /* Seal the object.*/
+ LOG_DEBUG("reading on channel %d finished", data_sock);
+ plasma_seal(conn->manager_state->plasma_conn, buf->object_id);
+ /* Notify any clients who were waiting on a fetch to this object. */
+ client_object_connection *object_conn, *next;
+ client_connection *client_conn;
+ HASH_FIND(fetch_hh, conn->manager_state->fetch_connections, &(buf->object_id),
+ sizeof(buf->object_id), object_conn);
+ plasma_reply reply = {.object_id = buf->object_id, .has_object = 1};
+ while (object_conn) {
+ next = object_conn->next;
+ client_conn = object_conn->client_conn;
+ send_client_reply(client_conn, &reply);
+ event_loop_remove_timer(client_conn->manager_state->loop,
+ object_conn->timer);
+ remove_object_connection(client_conn, object_conn);
+ object_conn = next;
+ }
+ /* Remove the request buffer used for reading this object's data. */
+ LL_DELETE(conn->transfer_queue, buf);
+ free(buf);
+ /* Switch to listening for requests from this socket, instead of reading
+ * object data. */
+ event_loop_remove_file(loop, data_sock);
+ event_loop_add_file(loop, data_sock, EVENT_LOOP_READ, process_message, conn);
}
-void start_writing_data(event_loop *loop,
- object_id object_id,
- uint8_t addr[4],
- int port,
- client_connection *conn) {
- uint8_t *data;
- int64_t data_size;
- uint8_t *metadata;
- int64_t metadata_size;
- plasma_get(conn->manager_state->store_conn, object_id, &data_size, &data,
- &metadata_size, &metadata);
- assert(metadata == data + data_size);
- plasma_buffer *buf = malloc(sizeof(plasma_buffer));
- buf->object_id = object_id;
- buf->data = data; /* We treat this as a pointer to the
- concatenated data and metadata. */
- buf->data_size = data_size;
- buf->metadata_size = metadata_size;
- buf->writable = 0;
-
- /* Look to see if we already have a connection to this plasma manager. */
- UT_string *ip_addr;
+client_connection *get_manager_connection(plasma_manager_state *state,
+ const char *ip_addr,
+ int port) {
+ /* TODO(swang): Should probably check whether ip_addr and port belong to us.
+ */
UT_string *ip_addr_port;
- utstring_new(ip_addr);
utstring_new(ip_addr_port);
- utstring_printf(ip_addr, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
- utstring_printf(ip_addr_port, "%s:%d", utstring_body(ip_addr), port);
+ utstring_printf(ip_addr_port, "%s:%d", ip_addr, port);
client_connection *manager_conn;
- HASH_FIND_STR(conn->manager_state->manager_connections,
- utstring_body(ip_addr_port), manager_conn);
-
+ HASH_FIND_STR(state->manager_connections, utstring_body(ip_addr_port),
+ manager_conn);
+ LOG_DEBUG("Getting manager connection to %s on DB client %d",
+ utstring_body(ip_addr_port), get_client_id(state->db));
if (!manager_conn) {
/* If we don't already have a connection to this manager, start one. */
manager_conn = malloc(sizeof(client_connection));
- manager_conn->fd = plasma_manager_connect(utstring_body(ip_addr), port);
- manager_conn->manager_state = conn->manager_state;
+ manager_conn->fd = plasma_manager_connect(ip_addr, port);
+ manager_conn->manager_state = state;
manager_conn->transfer_queue = NULL;
manager_conn->cursor = 0;
-
manager_conn->ip_addr_port = strdup(utstring_body(ip_addr_port));
HASH_ADD_KEYPTR(hh, manager_conn->manager_state->manager_connections,
manager_conn->ip_addr_port,
strlen(manager_conn->ip_addr_port), manager_conn);
}
utstring_free(ip_addr_port);
+ return manager_conn;
+}
+
+void process_transfer_request(event_loop *loop,
+ object_id object_id,
+ uint8_t addr[4],
+ int port,
+ client_connection *conn) {
+ uint8_t *data;
+ int64_t data_size;
+ uint8_t *metadata;
+ int64_t metadata_size;
+ /* TODO(swang): A non-blocking plasma_get, or else we could block here
+ * forever if we don't end up sealing this object. */
+ plasma_get(conn->manager_state->plasma_conn, object_id, &data_size, &data,
+ &metadata_size, &metadata);
+ assert(metadata == data + data_size);
+ plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer));
+ buf->type = PLASMA_DATA;
+ buf->object_id = object_id;
+ buf->data = data; /* We treat this as a pointer to the
+ concatenated data and metadata. */
+ buf->data_size = data_size;
+ buf->metadata_size = metadata_size;
+
+ UT_string *ip_addr;
+ utstring_new(ip_addr);
+ utstring_printf(ip_addr, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
+ client_connection *manager_conn =
+ get_manager_connection(conn->manager_state, utstring_body(ip_addr), port);
utstring_free(ip_addr);
if (manager_conn->transfer_queue == NULL) {
/* If we already have a connection to this manager and its inactive,
* (re)register it with the event loop again. */
event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE,
- write_object_chunk, manager_conn);
+ send_queued_request, manager_conn);
}
/* Add this transfer request to this connection's transfer queue. */
LL_APPEND(manager_conn->transfer_queue, buf);
}
-void start_reading_data(event_loop *loop,
- int client_sock,
- object_id object_id,
- int64_t data_size,
- int64_t metadata_size,
- client_connection *conn) {
- plasma_buffer *buf = malloc(sizeof(plasma_buffer));
+void process_data_request(event_loop *loop,
+ int client_sock,
+ object_id object_id,
+ int64_t data_size,
+ int64_t metadata_size,
+ client_connection *conn) {
+ plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer));
buf->object_id = object_id;
buf->data_size = data_size;
buf->metadata_size = metadata_size;
- buf->writable = 1;
- plasma_create(conn->manager_state->store_conn, object_id, data_size, NULL,
+ plasma_create(conn->manager_state->plasma_conn, object_id, data_size, NULL,
metadata_size, &(buf->data));
LL_APPEND(conn->transfer_queue, buf);
conn->cursor = 0;
@@ -255,10 +506,152 @@ void start_reading_data(event_loop *loop,
/* Switch to reading the data from this socket, instead of listening for
* other requests. */
event_loop_remove_file(loop, client_sock);
- event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, read_object_chunk,
+ event_loop_add_file(loop, client_sock, EVENT_LOOP_READ, process_data_chunk,
conn);
}
+/**
+ * Request a transfer for the given object ID from the next manager believed to
+ * have a copy. Adds the request for this object ID to the queue of outgoing
+ * requests to the manager we want to try.
+ *
+ * @param client_conn The context for the connection to this client.
+ * @param object_id The object ID we want to request a transfer of.
+ * @returns Void.
+ */
+void request_transfer_from(client_connection *client_conn,
+ object_id object_id) {
+ client_object_connection *object_conn =
+ get_object_connection(client_conn, object_id);
+ CHECK(object_conn);
+ CHECK(object_conn->manager_count > 0);
+ char addr[16];
+ int port;
+ int i = object_conn->num_retries % object_conn->manager_count;
+ parse_ip_addr_port(object_conn->manager_vector[i], addr, &port);
+
+ client_connection *manager_conn =
+ get_manager_connection(client_conn->manager_state, addr, port);
+ plasma_request_buffer *transfer_request =
+ malloc(sizeof(plasma_request_buffer));
+ transfer_request->type = PLASMA_TRANSFER;
+ transfer_request->object_id = object_conn->object_id;
+
+ if (manager_conn->transfer_queue == NULL) {
+ /* If we already have a connection to this manager and its inactive,
+ * (re)register it with the event loop. */
+ event_loop_add_file(client_conn->manager_state->loop, manager_conn->fd,
+ EVENT_LOOP_WRITE, send_queued_request, manager_conn);
+ }
+ /* Add this transfer request to this connection's transfer queue. */
+ LL_APPEND(manager_conn->transfer_queue, transfer_request);
+}
+
+int manager_timeout_handler(event_loop *loop, timer_id id, void *context) {
+ client_object_connection *object_conn = context;
+ client_connection *client_conn = object_conn->client_conn;
+ LOG_DEBUG("Timer went off, %d tries left", object_conn->num_retries);
+ if (object_conn->num_retries > 0) {
+ request_transfer_from(client_conn, object_conn->object_id);
+ object_conn->num_retries--;
+ return MANAGER_TIMEOUT;
+ }
+ plasma_reply reply = {.object_id = object_conn->object_id, .has_object = 0};
+ send_client_reply(client_conn, &reply);
+ remove_object_connection(client_conn, object_conn);
+ return AE_NOMORE;
+}
+
+/**
+ * Given an object ID and the managers it can be found on, start requesting a
+ * transfer from the managers.
+ *
+ * @param object_id The object ID we want to request a transfer of.
+ * @param manager_count The number of managers the object can be found on.
+ * @param manager_vector A vector of the IP addresses of the managers that the
+ * object can be found on.
+ * @param context The context for the connection to this client.
+ *
+ * Initializes a new context for this client and object. Managers are tried in
+ * order until we receive the data or we timeout and run out of retries.
+ */
+void request_transfer(object_id object_id,
+ int manager_count,
+ const char *manager_vector[],
+ void *context) {
+ client_connection *client_conn = (client_connection *) context;
+ client_object_connection *object_conn =
+ get_object_connection(client_conn, object_id);
+ CHECK(object_conn);
+ LOG_DEBUG("Object is on %d managers", manager_count);
+ if (manager_count == 0) {
+ /* TODO(swang): Instead of immediately counting this as a failure, maybe
+ * register a Redis callback for changes to this object table entry. */
+ free(manager_vector);
+ plasma_reply reply = {.object_id = object_conn->object_id, .has_object = 0};
+ send_client_reply(client_conn, &reply);
+ remove_object_connection(client_conn, object_conn);
+ return;
+ }
+ /* Pick a different manager to request a transfer from on every attempt. */
+ object_conn->manager_count = manager_count;
+ object_conn->manager_vector = malloc(manager_count * sizeof(char *));
+ memset(object_conn->manager_vector, 0, manager_count * sizeof(char *));
+ for (int i = 0; i < manager_count; ++i) {
+ int len = strlen(manager_vector[i]);
+ object_conn->manager_vector[i] = malloc(len + 1);
+ strncpy(object_conn->manager_vector[i], manager_vector[i], len);
+ object_conn->manager_vector[i][len] = '\0';
+ }
+ free(manager_vector);
+ /* Wait for the object data for the default number of retries, which timeout
+ * after a default interval. */
+ object_conn->num_retries = NUM_RETRIES;
+ object_conn->timer =
+ event_loop_add_timer(client_conn->manager_state->loop, MANAGER_TIMEOUT,
+ manager_timeout_handler, object_conn);
+ request_transfer_from(client_conn, object_id);
+}
+
+void process_fetch_request(client_connection *client_conn,
+ object_id object_id) {
+ plasma_reply reply = {.object_id = object_id};
+ if (client_conn->manager_state->db == NULL) {
+ reply.has_object = 0;
+ send_client_reply(client_conn, &reply);
+ return;
+ }
+ /* Return success immediately if we already have this object. */
+ int is_local = 0;
+ plasma_contains(client_conn->manager_state->plasma_conn, object_id,
+ &is_local);
+ if (is_local) {
+ reply.has_object = 1;
+ send_client_reply(client_conn, &reply);
+ return;
+ }
+ /* Register the new context with the current client connection. */
+ client_object_connection *object_conn =
+ add_object_connection(client_conn, object_id);
+ if (!object_conn) {
+ LOG_DEBUG("Unable to allocate memory for object context.");
+ reply.has_object = 0;
+ send_client_reply(client_conn, &reply);
+ }
+ /* Request a transfer from a plasma manager that has this object. */
+ object_table_lookup(client_conn->manager_state->db, object_id,
+ request_transfer, client_conn);
+}
+
+void process_fetch_requests(client_connection *client_conn,
+ int num_object_ids,
+ object_id object_ids[]) {
+ for (int i = 0; i < num_object_ids; ++i) {
+ client_conn->num_return_objects++;
+ process_fetch_request(client_conn, object_ids[i]);
+ }
+}
+
void process_message(event_loop *loop,
int client_sock,
void *context,
@@ -272,16 +665,27 @@ void process_message(event_loop *loop,
switch (type) {
case PLASMA_TRANSFER:
- LOG_DEBUG("transfering object to manager with port %d", req->port);
- start_writing_data(loop, req->object_id, req->addr, req->port, conn);
+ process_transfer_request(loop, req->object_ids[0], req->addr, req->port,
+ conn);
break;
case PLASMA_DATA:
- LOG_DEBUG("starting to stream data");
- start_reading_data(loop, client_sock, req->object_id, req->data_size,
- req->metadata_size, conn);
+ LOG_DEBUG("Starting to stream data");
+ process_data_request(loop, client_sock, req->object_ids[0], req->data_size,
+ req->metadata_size, conn);
+ break;
+ case PLASMA_FETCH:
+ LOG_DEBUG("Processing fetch");
+ process_fetch_requests(conn, req->num_object_ids, req->object_ids);
+ break;
+ case PLASMA_SEAL:
+ LOG_DEBUG("Publishing to object table from DB client %d.",
+ get_client_id(conn->manager_state->db));
+ object_table_add(conn->manager_state->db, req->object_ids[0]);
break;
case DISCONNECT_CLIENT: {
LOG_INFO("Disconnecting client on fd %d", client_sock);
+ /* TODO(swang): Check if this connection was to a plasma manager. If so,
+ * delete it. */
event_loop_remove_file(loop, client_sock);
close(client_sock);
free(conn);
@@ -303,47 +707,37 @@ void new_client_connection(event_loop *loop,
client_connection *conn = malloc(sizeof(client_connection));
conn->manager_state = (plasma_manager_state *) context;
conn->transfer_queue = NULL;
+ conn->fd = new_socket;
+ conn->active_objects = NULL;
+ conn->num_return_objects = 0;
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message, conn);
- LOG_DEBUG("new connection with fd %d", new_socket);
+ LOG_DEBUG("New plasma manager connection with fd %d", new_socket);
}
void start_server(const char *store_socket_name,
const char *master_addr,
- int port) {
- struct sockaddr_in name;
- int sock = socket(PF_INET, SOCK_STREAM, 0);
- if (sock < 0) {
- LOG_ERR("could not create socket");
- exit(-1);
- }
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- name.sin_addr.s_addr = htonl(INADDR_ANY);
- /* Make the socket non-blocking. */
- int flags = fcntl(sock, F_GETFL, 0);
- CHECK(fcntl(sock, F_SETFL, flags | O_NONBLOCK) == 0);
- int on = 1;
- setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
- if (bind(sock, (struct sockaddr *) &name, sizeof(name)) < 0) {
- LOG_ERR("could not bind socket");
- exit(-1);
- }
- LOG_DEBUG("listening on port %d", port);
- if (listen(sock, 5) == -1) {
- LOG_ERR("could not listen to socket");
- exit(-1);
- }
+ int port,
+ const char *db_addr,
+ int db_port) {
+ int sock = bind_inet_sock(port);
+ CHECKM(sock >= 0, "Unable to bind to manager port");
- event_loop *loop = event_loop_create();
- plasma_manager_state *state = init_plasma_manager_state(store_socket_name);
- event_loop_add_file(loop, sock, EVENT_LOOP_READ, new_client_connection,
- state);
- event_loop_run(loop);
+ g_manager_state = init_plasma_manager_state(store_socket_name, master_addr,
+ port, db_addr, db_port);
+ CHECK(g_manager_state);
+ LOG_DEBUG("Started server connected to store %s, listening on port %d",
+ store_socket_name, port);
+ event_loop_add_file(g_manager_state->loop, sock, EVENT_LOOP_READ,
+ new_client_connection, g_manager_state);
+ event_loop_run(g_manager_state->loop);
}
/* Report "success" to valgrind. */
void signal_handler(int signal) {
if (signal == SIGTERM) {
+ if (g_manager_state) {
+ db_disconnect(g_manager_state->db);
+ }
exit(0);
}
}
@@ -356,8 +750,10 @@ int main(int argc, char *argv[]) {
char *master_addr = NULL;
/* Port number the manager should use. */
int port;
+ /* IP address and port of state database. */
+ char *db_host = NULL;
int c;
- while ((c = getopt(argc, argv, "s:m:p:")) != -1) {
+ while ((c = getopt(argc, argv, "s:m:p:d:")) != -1) {
switch (c) {
case 's':
store_socket_name = optarg;
@@ -368,6 +764,9 @@ int main(int argc, char *argv[]) {
case 'p':
port = atoi(optarg);
break;
+ case 'd':
+ db_host = optarg;
+ break;
default:
LOG_ERR("unknown option %c", c);
exit(-1);
@@ -385,5 +784,12 @@ int main(int argc, char *argv[]) {
"123.456.789.10 with -m switch");
exit(-1);
}
- start_server(store_socket_name, master_addr, port);
+ char db_addr[16];
+ int db_port;
+ if (db_host) {
+ parse_ip_addr_port(db_host, db_addr, &db_port);
+ start_server(store_socket_name, master_addr, port, db_addr, db_port);
+ } else {
+ start_server(store_socket_name, master_addr, port, NULL, 0);
+ }
}
diff --git a/src/plasma_manager.h b/src/plasma_manager.h
index 27075632e..368b1314e 100644
--- a/src/plasma_manager.h
+++ b/src/plasma_manager.h
@@ -7,7 +7,7 @@
typedef struct client_connection client_connection;
/**
- * Start transfering data to another object store manager.
+ * Process a request from another object store manager to transfer an object.
*
* @param loop This is the event loop of the plasma manager.
* @param object_id The object_id of the object we will be sending.
@@ -15,18 +15,20 @@ typedef struct client_connection client_connection;
* to.
* @param port The port of the plasma manager we are sending the object to.
* @param conn The client_connection to the other plasma manager.
+ * @return Void.
*
- * This establishes a connection to the remote manager and sends the data
- * header to the other object manager.
+ * This establishes a connection to the remote manager if one doesn't already
+ * exist, and queues up the request to transfer the data to the other object
+ * manager.
*/
-void start_writing_data(event_loop *loop,
- object_id object_id,
- uint8_t addr[4],
- int port,
- client_connection *conn);
+void process_transfer(event_loop *loop,
+ object_id object_id,
+ uint8_t addr[4],
+ int port,
+ client_connection *conn);
/**
- * Start reading data from another object manager.
+ * Process a request from another object store manager to receive data.
*
* @param loop This is the event loop of the plasma manager.
* @param client_sock The connection to the other plasma manager.
@@ -34,46 +36,84 @@ void start_writing_data(event_loop *loop,
* @param data_size Size of the object.
* @param metadata_size Size of the metadata.
* @param conn The client_connection to the other plasma manager.
+ * @return Void.
*
- * Initializes the object we are going to write to in the
- * local plasma store and then switches the data socket to reading mode.
+ * Initializes the object we are going to write to in the local plasma store
+ * and then switches the data socket to read the raw object bytes instead of
+ * plasma requests.
*/
-void start_reading_data(event_loop *loop,
- int client_sock,
- object_id object_id,
- int64_t data_size,
- int64_t metadata_size,
- client_connection *conn);
+void process_data(event_loop *loop,
+ int client_sock,
+ object_id object_id,
+ int64_t data_size,
+ int64_t metadata_size,
+ client_connection *conn);
/**
* Read the next chunk of the object in transit from the plasma manager
- * that is connected to the connection with index "conn_index". Once all data
- * has been read, the socket switches to listening for the next request.
+ * connected to the given socket. Once all data for this object has been read,
+ * the socket switches to listening for the next plasma request.
*
* @param loop This is the event loop of the plasma manager.
* @param data_sock The connection to the other plasma manager.
* @param context The client_connection to the other plasma manager.
- *
+ * @return Void.
*/
-void read_object_chunk(event_loop *loop,
- int data_sock,
- void *context,
- int events);
+void process_data_chunk(event_loop *loop,
+ int data_sock,
+ void *context,
+ int events);
/**
- * Write the next chunk of the object currently transfered to the plasma manager
- * that is connected to the socket "data_sock". If no data has been sent yet,
- * the initial handshake to transfer the object size is performed.
+ * Process a fetch request. The fetch request tries:
+ * 1) If there is no connection to the database, return faliure to the client.
+ * 2) If the object is available locally, return success to the client.
+ * 3) Query the database for plasma managers that the object might be on.
+ * 4) Request a transfer from each of the managers that the object might be on
+ * until we receive the data, or until we timeout.
+ * 5) Returns success or failure to the client depending on whether we received
+ * the data or not.
+ *
+ * @param client_conn The connection context for the client that made the
+ * request.
+ * @param object_id The object ID requested.
+ * @return Void.
+ */
+void process_fetch_request(client_connection *client_conn, object_id object_id);
+
+/**
+ * Process a fetch request for multiple objects. The success of each object
+ * will be written back individually to the socket connected to the client that
+ * made the request in a plasma_reply. See documentation for
+ * process_fetch_request for the sequence of operations per object.
+ *
+ * @param client_conn The connection context for the client that made the
+ * request.
+ * @param object_id_count The number of object IDs requested.
+ * @param object_ids[] The vector of object IDs requested.
+ * @return Void.
+ */
+void process_fetch_requests(client_connection *client_conn,
+ int object_id_count,
+ object_id object_ids[]);
+
+/**
+ * Send the next request queued for the other plasma manager connected to the
+ * socket "data_sock". This could be a request to either write object data or
+ * request object data. If the request is to write object data and no data has
+ * been sent yet, the initial handshake to transfer the object size is
+ * performed.
*
* @param loop This is the event loop of the plasma manager.
* @param data_sock This is the socket the other plasma manager is listening on.
* @param context The client_connection to the other plasma manager, contains a
* queue of objects that will be sent.
+ * @return Void.
*/
-void write_object_chunk(event_loop *loop,
- int data_sock,
- void *context,
- int events);
+void send_queued_request(event_loop *loop,
+ int data_sock,
+ void *context,
+ int events);
/**
* Register a new client connection with the plasma manager. A client can
@@ -82,7 +122,7 @@ void write_object_chunk(event_loop *loop,
* @param loop This is the event loop of the plasma manager.
* @param listener_socket The socket the plasma manager is listening on.
* @param context The plasma manager state.
- *
+ * @return Void.
*/
void new_client_connection(event_loop *loop,
int listener_sock,
diff --git a/src/plasma_store.c b/src/plasma_store.c
index 3bdc1364a..3cb251234 100644
--- a/src/plasma_store.c
+++ b/src/plasma_store.c
@@ -113,7 +113,7 @@ plasma_store_state *init_plasma_store(event_loop *loop) {
}
/* Create a new object buffer in the hash table. */
-void create_object(plasma_store_state *s,
+void create_object(plasma_store_state *plasma_state,
object_id object_id,
int64_t data_size,
int64_t metadata_size,
@@ -121,7 +121,10 @@ void create_object(plasma_store_state *s,
LOG_DEBUG("creating object"); /* TODO(pcm): add object_id here */
object_table_entry *entry;
- HASH_FIND(handle, s->open_objects, &object_id, sizeof(object_id), entry);
+ HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id),
+ entry);
+ /* TODO(swang): Return this error to the client instead of
+ * exiting. */
CHECKM(entry == NULL, "Cannot create object twice.");
uint8_t *pointer = dlmalloc(data_size + metadata_size);
@@ -140,7 +143,8 @@ void create_object(plasma_store_state *s,
entry->fd = fd;
entry->map_size = map_size;
entry->offset = offset;
- HASH_ADD(handle, s->open_objects, object_id, sizeof(object_id), entry);
+ HASH_ADD(handle, plasma_state->open_objects, object_id, sizeof(object_id),
+ entry);
result->handle.store_fd = fd;
result->handle.mmap_size = map_size;
result->data_offset = offset;
@@ -150,12 +154,13 @@ void create_object(plasma_store_state *s,
}
/* Get an object from the hash table. */
-int get_object(plasma_store_state *s,
+int get_object(plasma_store_state *plasma_state,
int conn,
object_id object_id,
plasma_object *result) {
object_table_entry *entry;
- HASH_FIND(handle, s->sealed_objects, &object_id, sizeof(object_id), entry);
+ HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
+ entry);
if (entry) {
result->handle.store_fd = entry->fd;
result->handle.mmap_size = entry->map_size;
@@ -167,15 +172,15 @@ int get_object(plasma_store_state *s,
} else {
object_notify_entry *notify_entry;
LOG_DEBUG("object not in hash table of sealed objects");
- HASH_FIND(handle, s->objects_notify, &object_id, sizeof(object_id),
- notify_entry);
+ HASH_FIND(handle, plasma_state->objects_notify, &object_id,
+ sizeof(object_id), notify_entry);
if (!notify_entry) {
notify_entry = malloc(sizeof(object_notify_entry));
memset(notify_entry, 0, sizeof(object_notify_entry));
utarray_new(notify_entry->conns, &ut_int_icd);
memcpy(¬ify_entry->object_id, &object_id, 20);
- HASH_ADD(handle, s->objects_notify, object_id, sizeof(object_id),
- notify_entry);
+ HASH_ADD(handle, plasma_state->objects_notify, object_id,
+ sizeof(object_id), notify_entry);
}
utarray_push_back(notify_entry->conns, &conn);
}
@@ -183,36 +188,40 @@ int get_object(plasma_store_state *s,
}
/* Check if an object is present. */
-int contains_object(plasma_store_state *s, object_id object_id) {
+int contains_object(plasma_store_state *plasma_state, object_id object_id) {
object_table_entry *entry;
- HASH_FIND(handle, s->sealed_objects, &object_id, sizeof(object_id), entry);
+ HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
+ entry);
return entry ? OBJECT_FOUND : OBJECT_NOT_FOUND;
}
/* Seal an object that has been created in the hash table. */
-void seal_object(plasma_store_state *s,
+void seal_object(plasma_store_state *plasma_state,
object_id object_id,
UT_array **conns,
plasma_object *result) {
LOG_DEBUG("sealing object"); // TODO(pcm): add object_id here
object_table_entry *entry;
- HASH_FIND(handle, s->open_objects, &object_id, sizeof(object_id), entry);
+ HASH_FIND(handle, plasma_state->open_objects, &object_id, sizeof(object_id),
+ entry);
if (!entry) {
return; /* TODO(pcm): return error */
}
- HASH_DELETE(handle, s->open_objects, entry);
- HASH_ADD(handle, s->sealed_objects, object_id, sizeof(object_id), entry);
+ HASH_DELETE(handle, plasma_state->open_objects, entry);
+ HASH_ADD(handle, plasma_state->sealed_objects, object_id, sizeof(object_id),
+ entry);
/* Inform all subscribers that a new object has been sealed. */
notification_queue *queue, *temp_queue;
- HASH_ITER(hh, s->pending_notifications, queue, temp_queue) {
+ HASH_ITER(hh, plasma_state->pending_notifications, queue, temp_queue) {
utarray_push_back(queue->object_ids, &object_id);
- send_notifications(s->loop, queue->subscriber_fd, s, 0);
+ send_notifications(plasma_state->loop, queue->subscriber_fd, plasma_state,
+ 0);
}
/* Inform processes getting this object that the object is ready now. */
object_notify_entry *notify_entry;
- HASH_FIND(handle, s->objects_notify, &object_id, sizeof(object_id),
+ HASH_FIND(handle, plasma_state->objects_notify, &object_id, sizeof(object_id),
notify_entry);
if (!notify_entry) {
*conns = NULL;
@@ -224,22 +233,23 @@ void seal_object(plasma_store_state *s,
result->metadata_offset = entry->offset + entry->info.data_size;
result->data_size = entry->info.data_size;
result->metadata_size = entry->info.metadata_size;
- HASH_DELETE(handle, s->objects_notify, notify_entry);
+ HASH_DELETE(handle, plasma_state->objects_notify, notify_entry);
*conns = notify_entry->conns;
free(notify_entry);
}
/* Delete an object that has been created in the hash table. */
-void delete_object(plasma_store_state *s, object_id object_id) {
+void delete_object(plasma_store_state *plasma_state, object_id object_id) {
LOG_DEBUG("deleting object"); // TODO(rkn): add object_id here
object_table_entry *entry;
- HASH_FIND(handle, s->sealed_objects, &object_id, sizeof(object_id), entry);
+ HASH_FIND(handle, plasma_state->sealed_objects, &object_id, sizeof(object_id),
+ entry);
/* TODO(rkn): This should probably not fail, but should instead throw an
* error. Maybe we should also support deleting objects that have been created
* but not sealed. */
CHECKM(entry != NULL, "To delete an object it must have been sealed.");
uint8_t *pointer = entry->pointer;
- HASH_DELETE(handle, s->sealed_objects, entry);
+ HASH_DELETE(handle, plasma_state->sealed_objects, entry);
dlfree(pointer);
free(entry);
}
@@ -249,10 +259,10 @@ void send_notifications(event_loop *loop,
int client_sock,
void *context,
int events) {
- plasma_store_state *s = context;
+ plasma_store_state *plasma_state = context;
notification_queue *queue;
- HASH_FIND_INT(s->pending_notifications, &client_sock, queue);
+ HASH_FIND_INT(plasma_state->pending_notifications, &client_sock, queue);
CHECK(queue != NULL);
int num_processed = 0;
@@ -280,13 +290,13 @@ void send_notifications(event_loop *loop,
}
/* Subscribe to notifications about sealed objects. */
-void subscribe_to_updates(plasma_store_state *s, int conn) {
+void subscribe_to_updates(plasma_store_state *plasma_state, int conn) {
LOG_DEBUG("subscribing to updates");
char dummy;
int fd = recv_fd(conn, &dummy, 1);
- CHECKM(HASH_CNT(handle, s->open_objects) == 0,
+ CHECKM(HASH_CNT(handle, plasma_state->open_objects) == 0,
"plasma_subscribe should be called before any objects are created.");
- CHECKM(HASH_CNT(handle, s->sealed_objects) == 0,
+ CHECKM(HASH_CNT(handle, plasma_state->sealed_objects) == 0,
"plasma_subscribe should be called before any objects are created.");
/* Create a new array to buffer notifications that can't be sent to the
* subscriber yet because the socket send buffer is full. TODO(rkn): the queue
@@ -295,47 +305,49 @@ void subscribe_to_updates(plasma_store_state *s, int conn) {
(notification_queue *) malloc(sizeof(notification_queue));
queue->subscriber_fd = fd;
utarray_new(queue->object_ids, &object_id_icd);
- HASH_ADD_INT(s->pending_notifications, subscriber_fd, queue);
+ HASH_ADD_INT(plasma_state->pending_notifications, subscriber_fd, queue);
/* Add a callback to the event loop to send queued notifications whenever
* there is room in the socket's send buffer. */
- event_loop_add_file(s->loop, fd, EVENT_LOOP_WRITE, send_notifications, s);
+ event_loop_add_file(plasma_state->loop, fd, EVENT_LOOP_WRITE,
+ send_notifications, plasma_state);
}
void process_message(event_loop *loop,
int client_sock,
void *context,
int events) {
- plasma_store_state *s = context;
+ plasma_store_state *plasma_state = context;
int64_t type;
int64_t length;
plasma_request *req;
read_message(client_sock, &type, &length, (uint8_t **) &req);
+ /* We're only sending a single object ID at a time for now. */
plasma_reply reply;
memset(&reply, 0, sizeof(reply));
UT_array *conns;
switch (type) {
case PLASMA_CREATE:
- create_object(s, req->object_id, req->data_size, req->metadata_size,
- &reply.object);
+ create_object(plasma_state, req->object_ids[0], req->data_size,
+ req->metadata_size, &reply.object);
send_fd(client_sock, reply.object.handle.store_fd, (char *) &reply,
sizeof(reply));
break;
case PLASMA_GET:
- if (get_object(s, client_sock, req->object_id, &reply.object) ==
- OBJECT_FOUND) {
+ if (get_object(plasma_state, client_sock, req->object_ids[0],
+ &reply.object) == OBJECT_FOUND) {
send_fd(client_sock, reply.object.handle.store_fd, (char *) &reply,
sizeof(reply));
}
break;
case PLASMA_CONTAINS:
- if (contains_object(s, req->object_id) == OBJECT_FOUND) {
+ if (contains_object(plasma_state, req->object_ids[0]) == OBJECT_FOUND) {
reply.has_object = 1;
}
plasma_send_reply(client_sock, &reply);
break;
case PLASMA_SEAL:
- seal_object(s, req->object_id, &conns, &reply.object);
+ seal_object(plasma_state, req->object_ids[0], &conns, &reply.object);
if (conns) {
for (int *c = (int *) utarray_front(conns); c != NULL;
c = (int *) utarray_next(conns, c)) {
@@ -346,10 +358,10 @@ void process_message(event_loop *loop,
}
break;
case PLASMA_DELETE:
- delete_object(s, req->object_id);
+ delete_object(plasma_state, req->object_ids[0]);
break;
case PLASMA_SUBSCRIBE:
- subscribe_to_updates(s, client_sock);
+ subscribe_to_updates(plasma_state, client_sock);
break;
case DISCONNECT_CLIENT: {
LOG_DEBUG("Disconnecting client on fd %d", client_sock);
diff --git a/src/plasma_store.h b/src/plasma_store.h
index dd5e963fb..b22302209 100644
--- a/src/plasma_store.h
+++ b/src/plasma_store.h
@@ -21,7 +21,9 @@ void create_object(plasma_store_state *s,
plasma_object *result);
/**
- * Get an object:
+ * Get an object. This method assumes that we currently have or will
+ * eventually have this object sealed. If the object has not yet been sealed,
+ * the client that requested the object will be notified when it is sealed.
*
* @param s The plasma store state.
* @param conn The client connection that requests the object.
diff --git a/test/test.py b/test/test.py
index 4e08ed67e..94b2acbab 100644
--- a/test/test.py
+++ b/test/test.py
@@ -43,6 +43,15 @@ def create_object(client, data_size, metadata_size, seal=True):
client.seal(object_id)
return object_id, memory_buffer, metadata
+def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None):
+ if memory_buffer is not None:
+ unit_test.assertEqual(memory_buffer[:], client2.get(object_id)[:])
+ if metadata is not None:
+ unit_test.assertEqual(metadata[:], client2.get_metadata(object_id)[:])
+ unit_test.assertEqual(client1.get(object_id)[:], client2.get(object_id)[:])
+ unit_test.assertEqual(client1.get_metadata(object_id)[:],
+ client2.get_metadata(object_id)[:])
+
class TestPlasmaClient(unittest.TestCase):
def setUp(self):
@@ -207,22 +216,42 @@ class TestPlasmaManager(unittest.TestCase):
plasma_store_command2 = [plasma_store_executable, "-s", store_name2]
if USE_VALGRIND:
- self.p2 = subprocess.Popen(["valgrind", "--track-origins=yes", "--error-exitcode=1"] + plasma_store_command1)
- self.p3 = subprocess.Popen(["valgrind", "--track-origins=yes", "--error-exitcode=1"] + plasma_store_command2)
+ self.p2 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--error-exitcode=1"] + plasma_store_command1)
+ self.p3 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--error-exitcode=1"] + plasma_store_command2)
else:
self.p2 = subprocess.Popen(plasma_store_command1)
self.p3 = subprocess.Popen(plasma_store_command2)
+ # Start a Redis server.
+ redis_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../common/thirdparty/redis-3.2.3/src/redis-server")
+ self.redis_process = None
+ manager_redis_args = []
+ if os.path.exists(redis_path):
+ redis_port = 6379
+ with open(os.devnull, 'w') as FNULL:
+ self.redis_process = subprocess.Popen([redis_path,
+ "--port", str(redis_port)],
+ stdout=FNULL)
+ time.sleep(0.1)
+ manager_redis_args = ["-d", "{addr}:{port}".format(addr="127.0.0.1",
+ port=redis_port)]
+
# Start two PlasmaManagers.
self.port1 = random.randint(10000, 50000)
self.port2 = random.randint(10000, 50000)
plasma_manager_executable = os.path.join(os.path.abspath(os.path.dirname(__file__)), "../build/plasma_manager")
- plasma_manager_command1 = [plasma_manager_executable, "-s", store_name1, "-m", "127.0.0.1", "-p", str(self.port1)]
- plasma_manager_command2 = [plasma_manager_executable, "-s", store_name2, "-m", "127.0.0.1", "-p", str(self.port2)]
+ plasma_manager_command1 = [plasma_manager_executable,
+ "-s", store_name1,
+ "-m", "127.0.0.1",
+ "-p", str(self.port1)] + manager_redis_args
+ plasma_manager_command2 = [plasma_manager_executable,
+ "-s", store_name2,
+ "-m", "127.0.0.1",
+ "-p", str(self.port2)] + manager_redis_args
if USE_VALGRIND:
- self.p4 = subprocess.Popen(["valgrind", "--track-origins=yes", "--error-exitcode=1"] + plasma_manager_command1)
- self.p5 = subprocess.Popen(["valgrind", "--track-origins=yes", "--error-exitcode=1"] + plasma_manager_command2)
+ self.p4 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--error-exitcode=1"] + plasma_manager_command1)
+ self.p5 = subprocess.Popen(["valgrind", "--track-origins=yes", "--leak-check=full", "--error-exitcode=1"] + plasma_manager_command2)
time.sleep(2.0)
else:
self.p4 = subprocess.Popen(plasma_manager_command1)
@@ -253,6 +282,63 @@ class TestPlasmaManager(unittest.TestCase):
self.p3.kill()
self.p4.kill()
self.p5.kill()
+ if self.redis_process:
+ self.redis_process.kill()
+
+ def test_fetch(self):
+ if self.redis_process is None:
+ print("Cannot test fetch without a running redis instance.")
+ self.assertTrue(False)
+ for _ in range(100):
+ # Create an object.
+ object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
+ # Fetch the object from the other plasma store.
+ # TODO(swang): This line is a hack! It makes sure that the entry will be
+ # in the object table once we call the fetch operation. Remove once
+ # retries are implemented by Ray common.
+ time.sleep(0.1)
+ successes = self.client2.fetch([object_id1])
+ self.assertEqual(successes, [True])
+ # Compare the two buffers.
+ assert_get_object_equal(self, self.client1, self.client2, object_id1,
+ memory_buffer=memory_buffer1, metadata=metadata1)
+ # Fetch in the other direction. These should return quickly because
+ # client1 already has the object.
+ successes = self.client1.fetch([object_id1])
+ self.assertEqual(successes, [True])
+ assert_get_object_equal(self, self.client2, self.client1, object_id1,
+ memory_buffer=memory_buffer1, metadata=metadata1)
+
+ def test_fetch_multiple(self):
+ if self.redis_process is None:
+ print("Cannot test fetch without a running redis instance.")
+ self.assertTrue(False)
+ for _ in range(20):
+ # Create two objects and a third fake one that doesn't exist.
+ object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000)
+ missing_object_id = random_object_id()
+ object_id2, memory_buffer2, metadata2 = create_object(self.client1, 2000, 2000)
+ object_ids = [object_id1, missing_object_id, object_id2]
+ # Fetch the objects from the other plasma store. The second object ID
+ # should timeout since it does not exist.
+ # TODO(swang): This line is a hack! It makes sure that the entry will be
+ # in the object table once we call the fetch operation. Remove once
+ # retries are implemented by Ray common.
+ time.sleep(0.1)
+ successes = self.client2.fetch(object_ids)
+ self.assertEqual(successes, [True, False, True])
+ # Compare the buffers of the objects that do exist.
+ assert_get_object_equal(self, self.client1, self.client2, object_id1,
+ memory_buffer=memory_buffer1, metadata=metadata1)
+ assert_get_object_equal(self, self.client1, self.client2, object_id2,
+ memory_buffer=memory_buffer2, metadata=metadata2)
+ # Fetch in the other direction. The fake object still does not exist.
+ successes = self.client1.fetch(object_ids)
+ self.assertEqual(successes, [True, False, True])
+ assert_get_object_equal(self, self.client2, self.client1, object_id1,
+ memory_buffer=memory_buffer1, metadata=metadata1)
+ assert_get_object_equal(self, self.client2, self.client1, object_id2,
+ memory_buffer=memory_buffer2, metadata=metadata2)
def test_transfer(self):
for _ in range(100):
@@ -261,25 +347,21 @@ class TestPlasmaManager(unittest.TestCase):
# Transfer the buffer to the the other PlasmaStore.
self.client1.transfer("127.0.0.1", self.port2, object_id1)
# Compare the two buffers.
- self.assertEqual(memory_buffer1[:], self.client2.get(object_id1)[:])
- self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:])
- self.assertEqual(metadata1[:], self.client2.get_metadata(object_id1)[:])
- self.assertEqual(self.client1.get_metadata(object_id1)[:], self.client2.get_metadata(object_id1)[:])
+ assert_get_object_equal(self, self.client1, self.client2, object_id1,
+ memory_buffer=memory_buffer1, metadata=metadata1)
# Transfer the buffer again.
self.client1.transfer("127.0.0.1", self.port2, object_id1)
- self.assertEqual(metadata1[:], self.client2.get_metadata(object_id1)[:])
# Compare the two buffers.
- self.assertEqual(self.client1.get(object_id1)[:], self.client2.get(object_id1)[:])
+ assert_get_object_equal(self, self.client1, self.client2, object_id1,
+ memory_buffer=memory_buffer1, metadata=metadata1)
# Create an object.
object_id2, memory_buffer2, metadata2 = create_object(self.client2, 20000, 20000)
# Transfer the buffer to the the other PlasmaStore.
self.client2.transfer("127.0.0.1", self.port1, object_id2)
# Compare the two buffers.
- self.assertEqual(memory_buffer2[:], self.client2.get(object_id2)[:])
- self.assertEqual(self.client1.get(object_id2)[:], self.client2.get(object_id2)[:])
- self.assertEqual(metadata2[:], self.client2.get_metadata(object_id2)[:])
- self.assertEqual(self.client1.get_metadata(object_id2)[:], self.client2.get_metadata(object_id2)[:])
+ assert_get_object_equal(self, self.client1, self.client2, object_id2,
+ memory_buffer=memory_buffer2, metadata=metadata2)
def test_illegal_functionality(self):
# Create an object id string.
@@ -307,8 +389,8 @@ class TestPlasmaManager(unittest.TestCase):
if __name__ == "__main__":
if len(sys.argv) > 1:
# pop the argument so we don't mess with unittest's own argument parser
- arg = sys.argv.pop()
- if arg == "valgrind":
+ if sys.argv[-1] == "valgrind":
+ arg = sys.argv.pop()
USE_VALGRIND = True
print("Using valgrind for tests")
unittest.main(verbosity=2)