diff --git a/python/plasma/plasma.py b/python/plasma/plasma.py index 819cebcbe..aea9b993d 100644 --- a/python/plasma/plasma.py +++ b/python/plasma/plasma.py @@ -142,29 +142,47 @@ class PlasmaClient(object): buff = libplasma.create(self.conn, object_id, size, metadata) return PlasmaBuffer(buff, object_id, self) - def get(self, object_id): + def get(self, object_ids, timeout_ms=-1): """Create a buffer from the PlasmaStore based on object ID. If the object has not been sealed yet, this call will block. The retrieved buffer is immutable. Args: - object_id (str): A string used to identify an object. + object_ids (List[str]): A list of strings used to identify some objects. + timeout_ms (int): The number of milliseconds that the get call should + block before timing out and returning. """ - buff = libplasma.get(self.conn, object_id)[0] - return PlasmaBuffer(buff, object_id, self) + results = libplasma.get(self.conn, object_ids, timeout_ms) + assert len(object_ids) == len(results) + returns = [] + for i in range(len(object_ids)): + if results[i] is None: + returns.append(None) + else: + returns.append(PlasmaBuffer(results[i][0], object_ids[i], self)) + return returns - def get_metadata(self, object_id): + def get_metadata(self, object_ids, timeout_ms=-1): """Create a buffer from the PlasmaStore based on object ID. If the object has not been sealed yet, this call will block until the object has been sealed. The retrieved buffer is immutable. Args: - object_id (str): A string used to identify an object. + object_ids (List[str]): A list of strings used to identify some objects. + timeout_ms (int): The number of milliseconds that the get call should + block before timing out and returning. """ - buff = libplasma.get(self.conn, object_id)[1] - return PlasmaBuffer(buff, object_id, self) + results = libplasma.get(self.conn, object_ids, timeout_ms) + assert len(object_ids) == len(results) + returns = [] + for i in range(len(object_ids)): + if results[i] is None: + returns.append(None) + else: + returns.append(PlasmaBuffer(results[i][1], object_ids[i], self)) + return returns def contains(self, object_id): """Check if the object is present and has been sealed in the PlasmaStore. diff --git a/python/plasma/test/test.py b/python/plasma/test/test.py index 7618d571e..1a9101a5a 100644 --- a/python/plasma/test/test.py +++ b/python/plasma/test/test.py @@ -22,10 +22,10 @@ USE_VALGRIND = False PLASMA_STORE_MEMORY = 1000000000 def assert_get_object_equal(unit_test, client1, client2, object_id, memory_buffer=None, metadata=None): - client1_buff = client1.get(object_id) - client2_buff = client2.get(object_id) - client1_metadata = client1.get_metadata(object_id) - client2_metadata = client2.get_metadata(object_id) + client1_buff = client1.get([object_id])[0] + client2_buff = client2.get([object_id])[0] + client1_metadata = client1.get_metadata([object_id])[0] + client2_metadata = client2.get_metadata([object_id])[0] unit_test.assertEqual(len(client1_buff), len(client2_buff)) unit_test.assertEqual(len(client1_metadata), len(client2_metadata)) # Check that the buffers from the two clients are the same. @@ -72,7 +72,7 @@ class TestPlasmaClient(unittest.TestCase): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = self.plasma_client.get(object_id) + memory_buffer = self.plasma_client.get([object_id])[0] for i in range(length): self.assertEqual(memory_buffer[i], chr(i % 256)) @@ -89,11 +89,11 @@ class TestPlasmaClient(unittest.TestCase): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = self.plasma_client.get(object_id) + memory_buffer = self.plasma_client.get([object_id])[0] for i in range(length): self.assertEqual(memory_buffer[i], chr(i % 256)) # Get the metadata. - metadata_buffer = self.plasma_client.get_metadata(object_id) + metadata_buffer = self.plasma_client.get_metadata([object_id])[0] self.assertEqual(len(metadata), len(metadata_buffer)) for i in range(len(metadata)): self.assertEqual(chr(metadata[i]), metadata_buffer[i]) @@ -112,6 +112,35 @@ class TestPlasmaClient(unittest.TestCase): else: self.assertTrue(False) + def test_get(self): + num_object_ids = 100 + # Test timing out of get with various timeouts. + for timeout in [0, 10, 100, 1000]: + object_ids = [random_object_id() for _ in range(num_object_ids)] + results = self.plasma_client.get(object_ids, timeout_ms=timeout) + self.assertEqual(results, num_object_ids * [None]) + + data_buffers = [] + metadata_buffers = [] + for i in range(num_object_ids): + if i % 2 == 0: + data_buffer, metadata_buffer = create_object_with_id(self.plasma_client, object_ids[i], 2000, 2000) + data_buffers.append(data_buffer) + metadata_buffers.append(metadata_buffer) + + # Test timing out from some but not all get calls with various timeouts. + for timeout in [0, 10, 100, 1000]: + data_results = self.plasma_client.get(object_ids, timeout_ms=timeout) + metadata_results = self.plasma_client.get(object_ids, timeout_ms=timeout) + for i in range(num_object_ids): + if i % 2 == 0: + self.assertTrue(plasma.buffers_equal(data_buffers[i // 2], data_results[i])) + # TODO(rkn): We should compare the metadata as well. But currently the + # types are different (e.g., memoryview versus bytearray). + # self.assertTrue(plasma.buffers_equal(metadata_buffers[i // 2], metadata_results[i])) + else: + self.assertIsNone(results[i]) + def test_store_full(self): # The store is started with 1GB, so make sure that create throws an # exception when it is full. @@ -336,7 +365,7 @@ class TestPlasmaClient(unittest.TestCase): # memory_buffer[0] = chr(0) # self.assertRaises(Exception, illegal_assignment) # Get the object. - memory_buffer = self.plasma_client.get(object_id) + memory_buffer = self.plasma_client.get([object_id])[0] # Make sure the object is read only. def illegal_assignment(): memory_buffer[0] = chr(0) diff --git a/python/ray/worker.py b/python/ray/worker.py index 5f674a855..011d4032b 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -428,20 +428,29 @@ class Worker(object): # Optionally do something with the contained_objectids here. contained_objectids = [] - def get_object(self, objectid): - """Get the value in the local object store associated with objectid. + def get_object(self, object_ids): + """Get the value or values in the local object store associated with object_ids. - Return the value from the local object store for objectid. This will block - until the value for objectid has been written to the local object store. + Return the values from the local object store for object_ids. This will block + until all the values for object_ids have been written to the local object store. Args: - objectid (object_id.ObjectID): The object ID of the value to retrieve. + object_ids (List[object_id.ObjectID]): A list of the object IDs whose + values should be retrieved. """ - self.plasma_client.fetch([objectid.id()]) - deserialized = numbuf.retrieve_list(objectid.id(), self.plasma_client.conn) + self.plasma_client.fetch([object_id.id() for object_id in object_ids]) + # We currently pass in a timeout of one second. + unready_ids = object_ids + while len(unready_ids) > 0: + results = numbuf.retrieve_list([object_id.id() for object_id in object_ids], self.plasma_client.conn, 1000) + unready_ids = [object_id for (object_id, val) in results if val is None] + # This would be a natural place to issue a command to reconstruct some of + # the objects. # Unwrap the object from the list (it was wrapped put_object). - assert len(deserialized) == 1 - return deserialized[0] + assert len(results) == len(object_ids) + for i in range(len(results)): + assert results[i][0] == object_ids[i].id() + return [result[1][0] for result in results] def submit_task(self, function_id, func_name, args): """Submit a remote task to the scheduler. @@ -1228,17 +1237,17 @@ def flush_log(worker=global_worker): worker.photon_client.log_event(event_log_key, event_log_value) worker.events = [] -def get(objectid, worker=global_worker): +def get(object_ids, worker=global_worker): """Get a remote object or a list of remote objects from the object store. - This method blocks until the object corresponding to objectid is available in + This method blocks until the object corresponding to the object ID is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been - created). If objectid is a list, then the objects corresponding to each object + created). If object_ids is a list, then the objects corresponding to each object in the list will be returned. Args: - objectid: Object ID of the object to get or a list of object IDs to get. + object_ids: Object ID of the object to get or a list of object IDs to get. Returns: A Python object or a list of Python objects. @@ -1249,19 +1258,19 @@ def get(objectid, worker=global_worker): if worker.mode == PYTHON_MODE: # In PYTHON_MODE, ray.get is the identity operation (the input will actually be a value not an objectid) - return objectid - if isinstance(objectid, list): - values = [worker.get_object(x) for x in objectid] + return object_ids + if isinstance(object_ids, list): + values = worker.get_object(object_ids) for i, value in enumerate(values): if isinstance(value, RayTaskError): - raise RayGetError(objectid[i], value) + raise RayGetError(object_ids[i], value) return values else: - value = worker.get_object(objectid) + value = worker.get_object([object_ids])[0] if isinstance(value, RayTaskError): # If the result is a RayTaskError, then the task that created this object # failed, and we should propagate the error message here. - raise RayGetError(objectid, value) + raise RayGetError(object_ids, value) return value def put(value, worker=global_worker): @@ -1705,7 +1714,7 @@ def get_arguments_for_execution(function, serialized_args, worker=global_worker) for (i, arg) in enumerate(serialized_args): if isinstance(arg, photon.ObjectID): # get the object from the local object store - argument = worker.get_object(arg) + argument = worker.get_object([arg])[0] if isinstance(argument, RayTaskError): # If the result is a RayTaskError, then the task that created this # object failed, and we should propagate the error message here. diff --git a/src/numbuf/python/src/pynumbuf/numbuf.cc b/src/numbuf/python/src/pynumbuf/numbuf.cc index fc6aaf5b6..d54ec6091 100644 --- a/src/numbuf/python/src/pynumbuf/numbuf.cc +++ b/src/numbuf/python/src/pynumbuf/numbuf.cc @@ -297,42 +297,80 @@ static PyObject* store_list(PyObject* self, PyObject* args) { * Python objects from the plasma data according to the schema and * returns the object. * - * @param args Object ID of the PyList to be retrieved and connection to the - * plasma store. - * @return The PyList. + * @param args The first argument is a list of object IDs of the lists to be + * retrieved and the second argument is the connection to the plasma + * store. + * @return A list of tuples, where the first element in the tuple is the object + * ID (appearing in the same order as in the argument to the method), + * and the second element in the tuple is the retrieved list (or None) + * if no value was retrieved. */ static PyObject* retrieve_list(PyObject* self, PyObject* args) { - object_id obj_id; + PyObject* object_id_list; PyObject* plasma_conn; - if (!PyArg_ParseTuple(args, "O&O", PyStringToUniqueID, &obj_id, &plasma_conn)) { + long long timeout_ms; + if (!PyArg_ParseTuple(args, "OOL", &object_id_list, &plasma_conn, &timeout_ms)) { return NULL; } plasma_connection* conn; if (!PyObjectToPlasmaConnection(plasma_conn, &conn)) { return NULL; } - object_id* buffer_obj_id = new object_id(obj_id); - /* This keeps a Plasma buffer in scope as long as an object that is backed by that - * buffer is in scope. This prevents memory in the object store from getting - * released while it is still being used to back a Python object. */ - PyObject* base = PyCapsule_New(buffer_obj_id, "buffer", BufferCapsule_Destructor); - PyCapsule_SetContext(base, plasma_conn); - Py_XINCREF(plasma_conn); - int64_t size, metadata_size; - uint8_t *data, *metadata; - plasma_get(conn, obj_id, &size, &data, &metadata_size, &metadata); + Py_ssize_t num_object_ids = PyList_Size(object_id_list); + object_id object_ids[num_object_ids]; + object_buffer object_buffers[num_object_ids]; - /* Remember: The metadata offset was written at the beginning of the plasma buffer. */ - int64_t header_end_offset = *((int64_t*)data); - auto schema_buffer = std::make_shared(metadata, metadata_size); - auto batch = std::shared_ptr(); - ARROW_CHECK_OK(read_batch(schema_buffer, header_end_offset, data + sizeof(size), - size - sizeof(size), &batch)); + for (int i = 0; i < num_object_ids; ++i) { + PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); + } - PyObject* result; - Status s = DeserializeList(batch->column(0), 0, batch->num_rows(), base, &result); - CHECK_SERIALIZATION_ERROR(s); - Py_XDECREF(base); - return result; + Py_BEGIN_ALLOW_THREADS; + plasma_get(conn, object_ids, num_object_ids, timeout_ms, object_buffers); + Py_END_ALLOW_THREADS; + + PyObject* returns = PyList_New(num_object_ids); + for (int i = 0; i < num_object_ids; ++i) { + PyObject* obj_id = PyList_GetItem(object_id_list, i); + PyObject* t = PyTuple_New(2); + Py_XINCREF(obj_id); + PyTuple_SetItem(t, 0, obj_id); + + if (object_buffers[i].data_size != -1) { + /* The object was retrieved, so return the object. */ + object_id* buffer_obj_id = new object_id(object_ids[i]); + /* This keeps a Plasma buffer in scope as long as an object that is backed by that + * buffer is in scope. This prevents memory in the object store from getting + * released while it is still being used to back a Python object. */ + PyObject* base = PyCapsule_New(buffer_obj_id, "buffer", BufferCapsule_Destructor); + PyCapsule_SetContext(base, plasma_conn); + Py_XINCREF(plasma_conn); + + /* Remember: The metadata offset was written at the beginning of the plasma buffer. + */ + int64_t header_end_offset = *((int64_t*)object_buffers[i].data); + auto schema_buffer = std::make_shared( + object_buffers[i].metadata, object_buffers[i].metadata_size); + auto batch = std::shared_ptr(); + ARROW_CHECK_OK(read_batch(schema_buffer, header_end_offset, + object_buffers[i].data + sizeof(object_buffers[i].data_size), + object_buffers[i].data_size - sizeof(object_buffers[i].data_size), &batch)); + + PyObject* result; + Status s = DeserializeList(batch->column(0), 0, batch->num_rows(), base, &result); + CHECK_SERIALIZATION_ERROR(s); + Py_XDECREF(base); + + PyTuple_SetItem(t, 1, result); + } else { + /* The object was not retrieved, so just add None to the list of return + * values. */ + Py_XINCREF(Py_None); + PyTuple_SetItem(t, 1, Py_None); + } + + PyList_SetItem(returns, i, t); + } + + return returns; } #endif // HAS_PLASMA diff --git a/src/plasma/example.c b/src/plasma/example.c deleted file mode 100644 index ffea9f653..000000000 --- a/src/plasma/example.c +++ /dev/null @@ -1,47 +0,0 @@ -/* A simple example on how to use the plasma store - * - * Can be called in the following way: - * - * cd build - * ./plasma_store -s /tmp/plasma_socket - * ./example -s /tmp/plasma_socket -g - * ./example -s /tmp/plasma_socket -c -f */ - -#include -#include -#include -#include - -#include "plasma.h" -#include "plasma_client.h" - -int main(int argc, char *argv[]) { - plasma_connection *conn = NULL; - int64_t size; - uint8_t *data; - int c; - object_id id = {{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, - 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}}; - while ((c = getopt(argc, argv, "s:cfg")) != -1) { - switch (c) { - case 's': - conn = plasma_connect(optarg, NULL, PLASMA_DEFAULT_RELEASE_DELAY); - break; - case 'c': - assert(conn != NULL); - plasma_create(conn, id, 100, NULL, 0, &data); - break; - case 'f': - assert(conn != NULL); - plasma_seal(conn, id); - break; - case 'g': - plasma_get(conn, id, &size, &data, NULL, NULL); - break; - default: - abort(); - } - } - assert(conn != NULL); - plasma_disconnect(conn); -} diff --git a/src/plasma/format/plasma.fbs b/src/plasma/format/plasma.fbs index 91ff13f29..d09f732b9 100644 --- a/src/plasma/format/plasma.fbs +++ b/src/plasma/format/plasma.fbs @@ -10,9 +10,6 @@ enum MessageType:int { // Get an object that is stored on the local Plasma store. PlasmaGetRequest, PlasmaGetReply, - // Non-blocking version of PlasmaGet - PlasmaGetLocalRequest, - PlasmaGetLocalReply, // Release an object. PlasmaReleaseRequest, PlasmaReleaseReply, @@ -109,6 +106,8 @@ table PlasmaSealReply { table PlasmaGetRequest { // IDs of the objects stored at local Plasma store we are getting. object_ids: [string]; + // The number of milliseconds before the request should timeout. + timeout_ms: long; } table PlasmaGetReply { @@ -122,21 +121,6 @@ table PlasmaGetReply { // The number of elements in both object_ids and plasma_objects arrays must agree. } -table PlasmaGetLocalRequest { - // IDs of the objects stored at local Plasma store we are getting. - object_ids: [string]; -} - -table PlasmaGetLocalReply { - // IDs of the objects being returned. - object_ids: [string]; - // Plasma object information, in the same order as their IDs. - plasma_objects: [PlasmaObject]; - // Indication if the object is local it's file descriptor has been transfered, - // same order as their IDs. - has_object: [int]; -} - table PlasmaReleaseRequest { // ID of the object to be released. object_id: string; diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index d6e8bdf81..7e22a3153 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -267,55 +267,104 @@ int plasma_create(plasma_connection *conn, return PlasmaError_OK; } -/* This method is used to get both the data and the metadata. */ void plasma_get(plasma_connection *conn, - object_id obj_id, - int64_t *size, - uint8_t **data, - int64_t *metadata_size, - uint8_t **metadata) { - /* Check if we already have a reference to the object. */ - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &obj_id, sizeof(object_id), object_entry); - plasma_object object_data; + object_id object_ids[], + int64_t num_objects, + int64_t timeout_ms, + object_buffer object_buffers[]) { + /* Fill out the info for the objects that are already in use locally. */ + bool all_present = true; + for (int i = 0; i < num_objects; ++i) { + object_in_use_entry *object_entry; + HASH_FIND(hh, conn->objects_in_use, &object_ids[i], sizeof(object_ids[i]), + object_entry); + if (object_entry == NULL) { + /* This object is not currently in use by this client, so we need to send + * a request to the store. */ + all_present = false; + /* Make a note to ourselves that the object is not present. */ + object_buffers[i].data_size = -1; + } else { + /* */ + plasma_object object_data; + plasma_object *object; + /* NOTE: If the object is still unsealed, we will deadlock, since we must + * have been the one who created it. */ + CHECKM(object_entry->is_sealed, + "Plasma client called get on an unsealed object that it created"); + object = &object_entry->object; + object_buffers[i].data = + lookup_mmapped_file(conn, object->handle.store_fd); + object_buffers[i].data = object_buffers[i].data + object->data_offset; + object_buffers[i].data_size = object->data_size; + object_buffers[i].metadata = object_buffers[i].data + object->data_size; + object_buffers[i].metadata_size = object->metadata_size; + /* Increment the count of the number of instances of this object that this + * client is using. A call to plasma_release is required to decrement this + * count. Cache the reference to the object. */ + increment_object_count(conn, object_ids[i], object, true); + } + } + + if (all_present) { + return; + } + + /* If we get here, then the objects aren't all currently in use by this + * client, so we need to send a request to the plasma store. */ + CHECK(plasma_send_GetRequest(conn->store_conn, conn->builder, object_ids, + num_objects, timeout_ms) >= 0); + uint8_t *reply_data = + plasma_receive(conn->store_conn, MessageType_PlasmaGetReply); + object_id received_obj_ids[num_objects]; + plasma_object object_data[num_objects]; plasma_object *object; - if (object_entry) { - /* If we have already have a reference to the object, use it to get the - * data pointer. - * NOTE: If the object is still unsealed, we will deadlock, since we must - * have been the one who created it. */ - CHECKM(object_entry->is_sealed, - "Plasma client called get on an unsealed object that it created"); - object = &object_entry->object; - *data = lookup_mmapped_file(conn, object->handle.store_fd); - } else { - /* Else, request a reference to the object data from the plasma store. */ - CHECK(plasma_send_GetRequest(conn->store_conn, conn->builder, &obj_id, 1) >= - 0); - uint8_t *reply_data = - plasma_receive(conn->store_conn, MessageType_PlasmaGetReply); - object_id received_obj_id; - plasma_read_GetReply(reply_data, &received_obj_id, &object_data, 1); - free(reply_data); - DCHECK(memcmp(&received_obj_id, &obj_id, sizeof(obj_id)) == 0); - int fd = recv_fd(conn->store_conn); - CHECK(fd >= 0); - object = &object_data; - *data = lookup_or_mmap(conn, fd, object->handle.store_fd, - object->handle.mmap_size); + plasma_read_GetReply(reply_data, received_obj_ids, object_data, num_objects); + free(reply_data); + + for (int i = 0; i < num_objects; ++i) { + DCHECK(object_ids_equal(received_obj_ids[i], object_ids[i])); + object = &object_data[i]; + if (object_buffers[i].data_size != -1) { + /* If the object was already in use by the client, then the store should + * have returned it. */ + DCHECK(object->data_size != -1); + /* We won't use this file descriptor, but the store sent us one, so we + * need to receive it and then close it right away so we don't leak file + * descriptors. */ + int fd = recv_fd(conn->store_conn); + close(fd); + CHECK(fd >= 0); + /* We've already filled out the information for this object, so we can + * just continue. */ + continue; + } + /* If we are here, the object was not currently in use, so we need to + * process the reply from the object store. */ + if (object->data_size != -1) { + /* The object was retrieved. The user will be responsible for releasing + * this object. */ + int fd = recv_fd(conn->store_conn); + CHECK(fd >= 0); + object_buffers[i].data = lookup_or_mmap(conn, fd, object->handle.store_fd, + object->handle.mmap_size); + /* Finish filling out the return values. */ + object_buffers[i].data = object_buffers[i].data + object->data_offset; + object_buffers[i].data_size = object->data_size; + object_buffers[i].metadata = object_buffers[i].data + object->data_size; + object_buffers[i].metadata_size = object->metadata_size; + /* Increment the count of the number of instances of this object that this + * client is using. A call to plasma_release is required to decrement this + * count. Cache the reference to the object. */ + increment_object_count(conn, received_obj_ids[i], object, true); + } else { + /* The object was not retrieved. Make sure we already put a -1 here to + * indicate that the object was not retrieved. The caller is not + * responsible for releasing this object. */ + DCHECK(object_buffers[i].data_size == -1); + object_buffers[i].data_size = -1; + } } - /* Finish filling out the return values. */ - *data = *data + object->data_offset; - *size = object->data_size; - /* If requested, return the metadata as well. */ - if (metadata != NULL) { - *metadata = *data + object->data_size; - *metadata_size = object->metadata_size; - } - /* Increment the count of the number of instances of this object that this - * client is using. A call to plasma_release is required to decrement this - * count. Cache the reference to the object. */ - increment_object_count(conn, obj_id, object, true); } /** @@ -424,26 +473,26 @@ void plasma_contains(plasma_connection *conn, bool plasma_compute_object_hash(plasma_connection *conn, object_id obj_id, unsigned char *digest) { - /* If we don't have the object, return an empty digest. */ - int has_object; - plasma_contains(conn, obj_id, &has_object); - if (!has_object) { + /* Get the plasma object data. We pass in a timeout of 0 to indicate that + * the operation should timeout immediately. */ + object_buffer obj_buffer; + object_id obj_id_array[1] = {obj_id}; + plasma_get(conn, obj_id_array, 1, 0, &obj_buffer); + /* If the object was not retrieved, return false. */ + if (obj_buffer.data_size == -1) { return false; } - /* Get the plasma object data. */ - int64_t size; - uint8_t *data; - int64_t metadata_size; - uint8_t *metadata; - plasma_get(conn, obj_id, &size, &data, &metadata_size, &metadata); /* Compute the hash. */ XXH64_state_t hash_state; XXH64_reset(&hash_state, XXH64_DEFAULT_SEED); - XXH64_update(&hash_state, (unsigned char *) data, size); - XXH64_update(&hash_state, (unsigned char *) metadata, metadata_size); + XXH64_update(&hash_state, (unsigned char *) obj_buffer.data, + obj_buffer.data_size); + XXH64_update(&hash_state, (unsigned char *) obj_buffer.metadata, + obj_buffer.metadata_size); uint64_t hash = XXH64_digest(&hash_state); - DCHECK(DIGEST_SIZE >= sizeof(uint64_t)); - memcpy(digest, &hash, DIGEST_SIZE); + DCHECK(DIGEST_SIZE >= sizeof(hash)); + memset(digest, 0, DIGEST_SIZE); + memcpy(digest, &hash, sizeof(hash)); /* Release the plasma object. */ plasma_release(conn, obj_id); return true; @@ -662,58 +711,6 @@ int get_manager_fd(plasma_connection *conn) { return conn->manager_conn; } -bool plasma_get_local(plasma_connection *conn, - object_id obj_id, - object_buffer *object_buffer) { - CHECK(conn != NULL); - /* Check if we already have a reference to the object. */ - object_in_use_entry *object_entry; - HASH_FIND(hh, conn->objects_in_use, &obj_id, sizeof(obj_id), object_entry); - plasma_object *object; - if (object_entry) { - /* If we have already have a reference to the object, use it to get the - * data pointer. */ - if (!object_entry->is_sealed) { - /* The object is in our local store, but it hasn't been sealed yet. */ - return false; - } - object = &object_entry->object; - object_buffer->data = lookup_mmapped_file(conn, object->handle.store_fd); - } else { - /* Else, request a reference to the object data from the plasma store. */ - CHECK(plasma_send_GetLocalRequest(conn->store_conn, conn->builder, &obj_id, - 1) >= 0); - uint8_t *reply_data = - plasma_receive(conn->store_conn, MessageType_PlasmaGetLocalReply); - plasma_object object_data; - int has_object; - plasma_read_GetLocalReply(reply_data, &obj_id, &object_data, &has_object, - 1); - free(reply_data); - - if (!has_object) { - /* The object is not in our local store. */ - return false; - } - - int fd = recv_fd(conn->store_conn); - CHECKM(fd >= 0, "recv_fd not successful"); - object = &object_data; - object_buffer->data = lookup_or_mmap(conn, fd, object->handle.store_fd, - object->handle.mmap_size); - } - /* Finish filling out the return values. */ - object_buffer->data += object->data_offset; - object_buffer->data_size = object->data_size; - object_buffer->metadata = object_buffer->data + object->data_size; - object_buffer->metadata_size = object->metadata_size; - /* Increment the count of the number of instances of this object that this - * client is using. A call to plasma_release is required to decrement this - * count. Cache the reference to the object. */ - increment_object_count(conn, obj_id, object, true); - return true; -} - int plasma_status(plasma_connection *conn, object_id object_id) { CHECK(conn != NULL); CHECK(conn->manager_conn >= 0); @@ -774,204 +771,3 @@ int plasma_wait(plasma_connection *conn, } return num_objects_ready; } - -/* - * TODO: maybe move the plasma_client_* functions in another file. - * - * plasma_client_* represent functions implemented by client; so probably - * need to be in a different file. - */ - -void plasma_client_get(plasma_connection *conn, - object_id obj_id, - object_buffer *object_buffer) { - CHECK(conn != NULL); - CHECK(conn->manager_conn >= 0); - - object_request request; - request.object_id = obj_id; - - while (true) { - if (plasma_get_local(conn, obj_id, object_buffer)) { - /* Object is in the local Plasma Store, and it is sealed. */ - return; - } - - object_id object_ids[1] = {obj_id}; - plasma_fetch(conn, 1, object_ids); - switch (plasma_status(conn, obj_id)) { - case ObjectStatus_Local: - /* Object has finished being transfered just after calling - * plasma_get_local(), and it is now in the local Plasma Store. Loop again - * to call plasma_get_local() and eventually return. */ - continue; - case ObjectStatus_Remote: - /* A fetch request has been already scheduled for obj_id, so wait for - * it to complete. */ - request.type = PLASMA_QUERY_LOCAL; - break; - case ObjectStatus_Nonexistent: - /* Object doesn’t exist in the system so ask local scheduler to create it. - */ - /* TODO: scheduler_create_object(obj_id); */ - /* Wait for the object to be (re)constructed and sealed either in the - * local Plasma Store or remotely. */ - request.type = PLASMA_QUERY_ANYWHERE; - break; - default: - CHECKM(0, "Unrecognizable object status.") - } - -/* - * Wait for obj_id to (1) be transferred and sealed in the local - * Plasma Store, if available remotely, or (2) be (re)constructued either - * locally or remotely, if obj_id didn't exist in the system. - * - if timeout, next iteration will retry plasma_fetch() or - * scheduler_create_object() - * - if request.status == ObjectStatus_Local, next iteration - * will get object and return - * - if request.status == ObjectStatus_Remote, next iteration - * will call plasma_fetch() - * - if request.status == ObjectStatus_Nonexistent, next iteration - * will call scheduler_create_object() - */ -#define TIMEOUT_WAIT_MS 200 - plasma_wait(conn, 1, &request, 1, TIMEOUT_WAIT_MS); - } -} - -int plasma_client_wait(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - uint64_t timeout, - int num_returns, - object_id return_object_ids[]) { - CHECK(conn->manager_conn >= 0); - CHECK(num_object_ids >= num_returns); - - object_request requests[num_object_ids]; - - /* Initialize array of object requests. We only care for the objects to be - * present in the system, not necessary in the local Plasma Store. Thus, we - * set the request type to PLASMA_QUERY_ANYWHERE. */ - for (int i = 0; i < num_object_ids; ++i) { - requests[i].object_id = object_ids[i]; - requests[i].type = PLASMA_QUERY_ANYWHERE; - } - - /* Loop until we get num_returns objects stored in the system either in the - * local Plasma Store or remotely. */ - uint64_t remaining_timeout = timeout; - while (true) { - struct timeval start, end; - gettimeofday(&start, NULL); - - int n = plasma_wait(conn, num_object_ids, requests, num_returns, - MIN(remaining_timeout, TIMEOUT_WAIT_MS)); - - gettimeofday(&end, NULL); - float diff_ms = (end.tv_sec - start.tv_sec); - diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; - remaining_timeout = - (remaining_timeout >= diff_ms ? remaining_timeout - diff_ms : 0); - - if (n >= num_returns || remaining_timeout == 0) { - /* Either (1) num_returns requests are satisfied or (2) timeout expired. - * In both cases we return. */ - int idx_returns = 0; - - for (int i = 0; i < num_returns; ++i) { - if (requests[i].status == ObjectStatus_Local || - requests[i].status == ObjectStatus_Remote) { - return_object_ids[idx_returns] = requests[i].object_id; - idx_returns += 1; - } - } - return idx_returns; - } - /* The timeout hasn't expired and we got less than num_returns in the - * system. Trigger reconstruction of the missing objects. */ - for (int i = 0; i < num_returns; ++i) { - if (requests[i].status == ObjectStatus_Nonexistent) { - /* Object doesn’t exist in the system so ask local scheduler to create - * object with ID requests[i].object_id. */ - /* TODO: scheduler_create_object(object_id); */ - printf("XXX Need to schedule object -- not implemented yet!\n"); - /* Subscribe to hear back when object_id is sealed. */ - } - } - } -} - -void plasma_client_multiget(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - object_buffer object_buffers[]) { - object_request requests[num_object_ids]; - - /* Set all request types to ObjectStatus_Local, as we want to get all objects - * into the local Plasma Store. */ - for (int i = 0; i < num_object_ids; ++i) { - requests[i].object_id = object_ids[i]; - requests[i].type = PLASMA_QUERY_LOCAL; - } - - while (true) { - int n; - - /* Issue a fetch command so the object IDs end up locally. */ - plasma_fetch(conn, num_object_ids, object_ids); - - /* Wait to get all objects in the system. The reason we call plasma_wait() - * here instead of iterating over plasma_client_get() is to increase - * concurrency as plasma_client_get() is blocking. */ - n = plasma_wait(conn, num_object_ids, requests, num_object_ids, - TIMEOUT_WAIT_MS); - - if (n == num_object_ids) { - /* All objects are in the system either on the local or a remote Plasma - * store, so we are done. */ - break; - } - } - - /* Now get the data for every object. */ - for (int i = 0; i < num_object_ids; ++i) { - plasma_client_get(conn, object_ids[i], &object_buffers[i]); - } -} - -/** - * TODO: maybe move object_requests_* functions in another file. - * The object_request data structure is defined in plasma.h since - * it is used by plasma_request and plasma_reply, but there is no - * plasma.c file. - */ -void object_requests_copy(int num_object_requests, - object_request object_requests_dst[], - object_request object_requests_src[]) { - for (int i = 0; i < num_object_requests; ++i) { - object_requests_dst[i].object_id = object_requests_src[i].object_id; - object_requests_dst[i].type = object_requests_src[i].type; - object_requests_dst[i].status = object_requests_src[i].type; - } -} - -object_request *object_requests_get_object(object_id object_id, - int num_object_requests, - object_request object_requests[]) { - for (int i = 0; i < num_object_requests; ++i) { - if (object_ids_equal(object_requests[i].object_id, object_id)) { - return &object_requests[i]; - } - } - return NULL; -} - -void object_requests_set_status_all(int num_object_requests, - object_request object_requests[], - int status) { - for (int i = 0; i < num_object_requests; ++i) { - object_requests[i].status = status; - } -} diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index d54a2c942..c5a01357c 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -97,26 +97,39 @@ int plasma_create(plasma_connection *conn, uint8_t **data); /** - * Get an object from the Plasma Store. This function will block until the - * object has been created and sealed in the Plasma Store. + * Object buffer data structure. + */ +typedef struct { + /** The size in bytes of the data object. */ + int64_t data_size; + /** The address of the data object. */ + uint8_t *data; + /** The metadata size in bytes. */ + int64_t metadata_size; + /** The address of the metadata. */ + uint8_t *metadata; +} object_buffer; + +/** + * Get some objects from the Plasma Store. This function will block until the + * objects have all been created and sealed in the Plasma Store or the timeout + * expires. The caller is responsible for releasing any retrieved objects, but + * the caller should not release objects that were not retrieved. * * @param conn The object containing the connection state. - * @param object_id The ID of the object to get. - * @param size The size in bytes of the retrieved object will be written at this - address. - * @param data The address of the object will be written at this address. - * @param metadata_size The size in bytes of the object's metadata will be - * written at this address. - * @param metadata The address of the object's metadata will be written at this - * address. + * @param object_ids The IDs of the objects to get. + * @param num_object_ids The number of object IDs to get. + * @param timeout_ms The amount of time in milliseconds to wait before this + * request times out. If this value is -1, then no timeout is set. + * @param object_buffers An array where the results will be stored. If the data + * size field is -1, then the object was not retrieved. * @return Void. */ void plasma_get(plasma_connection *conn, - object_id object_id, - int64_t *size, - uint8_t **data, - int64_t *metadata_size, - uint8_t **metadata); + object_id object_ids[], + int64_t num_objects, + int64_t timeout_ms, + object_buffer object_buffers[]); /** * Tell Plasma that the client no longer needs the object. This should be called @@ -252,34 +265,6 @@ int plasma_subscribe(plasma_connection *conn); */ int get_manager_fd(plasma_connection *conn); -/** - * Object buffer data structure. - */ -typedef struct { - /** The size in bytes of the data object. */ - int64_t data_size; - /** The address of the data object. */ - uint8_t *data; - /** The metadata size in bytes. */ - int64_t metadata_size; - /** The address of the metadata. */ - uint8_t *metadata; -} object_buffer; - -/** - * Get specified object from the local Plasma Store. This function is - * non-blocking. - * - * @param conn The object containing the connection state. - * @param object_id The ID of the object to get. - * @param object_buffer The data structure where the object information will - * be written, including object payload and metadata. - * @return True if the object is returned and false otherwise. - */ -bool plasma_get_local(plasma_connection *conn, - object_id object_id, - object_buffer *object_buffer); - /** * Return the status of a given object. This method may query the object table. * @@ -347,111 +332,4 @@ int plasma_wait(plasma_connection *conn, int num_ready_objects, uint64_t timeout_ms); -/** - * TODO: maybe move the plasma_client_* functions in another file. - * - * plasma_client_* represent functions implemented by client; so probably - * need to be in a different file. - */ - -/** - * Get an object from the Plasma Store. This function will block until the - * object has been created and sealed in the Plasma Store. - * - * @param conn The object containing the connection state. - * @param object_id The ID of the object to get. - * @param object_buffer The data structure where the object information will be - * written, including object payload and metadata. - * @return Void. - */ -void plasma_client_get(plasma_connection *conn, - object_id object_id, - object_buffer *object_buffer); - -/** - * Wait for objects to be created (right now, wait for local objects). - * - * @param conn The object containing the connection state. - * @param num_object_ids Number of object IDs wait is called on. - * @param object_ids Object IDs wait is called on. - * @param timeout Wait will time out and return after this number of ms. - * @param num_returns Number of object IDs wait will return if it doesn't time - * out. - * @param return_object_ids Out parameter for the object IDs returned by wait. - * This is an array of size num_returns. If the number of objects that - * are ready when we time out, the objects will be stored in the last - * slots of the array and the number of objects is returned. - * @return Number of objects that are actually ready. - */ -int plasma_client_wait(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - uint64_t timeout, - int num_returns, - object_id return_object_ids[]); - -/** - * Get an array of objects from the Plasma Store. This function will block until - * all object in the array have been created and sealed in the Plasma Store. - * - * @param conn The object containing the connection state. - * @param num_object_ids The number of objects in the array to be returned. - * @param object_ids The array of object IDs to be returned. - * @param object_buffers The array of data structure where the information of - * the return objects will be stored. The objects appear in the same - * order as their IDs in the object_ids array, - * @return Void. - */ -void plasma_client_multiget(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - object_buffer object_buffers[]); - -/** - * TODO: maybe move object_requests_* functions in another file. - * The object_request data structure is defined in plasma.h since - * it is used by plasma_request and plasma_reply, but there is no - * plasma.c file. - */ - -/** - * Copy an array of object requests into another one. - * - * @param num_object_requests Number of elements in the object_requests arrays. - * @param object_requests_dst Destination object_requests array. - * @param object_requests_dst Source object_requests array. - * @return None. - */ -void object_requests_copy(int num_object_requests, - object_request object_requests_dst[], - object_request object_requests_src[]); - -/** - * Given an object ID, get the corresponding object request - * form an array of object requests. - * - * @param object_id Identifier of the requested object. - * @param num_object_requests Number of elements in the object requests array. - * @param object_requests The array of object requests which - * contains the object (object_id). - * @return Object request, if found; NULL, if not found. - */ -object_request *object_requests_get_object(object_id object_id, - int num_object_requests, - object_request object_requests[]); - -/** - * Initialize status of all object requests in an array. - * - * @param num_object_requests Number of elements in the array of object - * requests. - * @param object_requests Array of object requests. - * @param status Value with which we initialize the status of each object - * request in the array. - * @return Void. - */ -void object_requests_set_status_all(int num_object_requests, - object_request object_requests[], - int status); - #endif /* PLASMA_CLIENT_H */ diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.c index bff7e6d9e..b8960d14a 100644 --- a/src/plasma/plasma_extension.c +++ b/src/plasma/plasma_extension.c @@ -126,36 +126,56 @@ PyObject *PyPlasma_release(PyObject *self, PyObject *args) { PyObject *PyPlasma_get(PyObject *self, PyObject *args) { plasma_connection *conn; - object_id object_id; - if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaConnection, &conn, - PyStringToUniqueID, &object_id)) { + PyObject *object_id_list; + long long timeout_ms; + if (!PyArg_ParseTuple(args, "O&OL", PyObjectToPlasmaConnection, &conn, + &object_id_list, &timeout_ms)) { return NULL; } - int64_t size; - uint8_t *data; - int64_t metadata_size; - uint8_t *metadata; + + Py_ssize_t num_object_ids = PyList_Size(object_id_list); + object_id object_ids[num_object_ids]; + object_buffer object_buffers[num_object_ids]; + + for (int i = 0; i < num_object_ids; ++i) { + PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); + } + Py_BEGIN_ALLOW_THREADS; - plasma_get(conn, object_id, &size, &data, &metadata_size, &metadata); + plasma_get(conn, object_ids, num_object_ids, timeout_ms, object_buffers); Py_END_ALLOW_THREADS; - PyObject *t = PyTuple_New(2); -#if PY_MAJOR_VERSION >= 3 - PyTuple_SetItem(t, 0, PyMemoryView_FromMemory((void *) data, - (Py_ssize_t) size, PyBUF_READ)); -#else - PyTuple_SetItem(t, 0, PyBuffer_FromMemory((void *) data, (Py_ssize_t) size)); -#endif + PyObject *returns = PyList_New(num_object_ids); + for (int i = 0; i < num_object_ids; ++i) { + if (object_buffers[i].data_size != -1) { + /* The object was retrieved, so return the object. */ + PyObject *t = PyTuple_New(2); #if PY_MAJOR_VERSION >= 3 - PyTuple_SetItem( - t, 1, PyMemoryView_FromMemory((void *) metadata, - (Py_ssize_t) metadata_size, PyBUF_READ)); + PyTuple_SetItem( + t, 0, PyMemoryView_FromMemory( + (void *) object_buffers[i].data, + (Py_ssize_t) object_buffers[i].data_size, PyBUF_READ)); + PyTuple_SetItem( + t, 1, PyMemoryView_FromMemory( + (void *) object_buffers[i].metadata, + (Py_ssize_t) object_buffers[i].metadata_size, PyBUF_READ)); #else - PyTuple_SetItem( - t, 1, PyBuffer_FromMemory((void *) metadata, (Py_ssize_t) metadata_size)); + PyTuple_SetItem( + t, 0, PyBuffer_FromMemory((void *) object_buffers[i].data, + (Py_ssize_t) object_buffers[i].data_size)); + PyTuple_SetItem(t, 1, PyBuffer_FromMemory( + (void *) object_buffers[i].metadata, + (Py_ssize_t) object_buffers[i].metadata_size)); #endif - - return t; + PyList_SetItem(returns, i, t); + } else { + /* The object was not retrieved, so just add None to the list of return + * values. */ + Py_XINCREF(Py_None); + PyList_SetItem(returns, i, Py_None); + } + } + return returns; } PyObject *PyPlasma_contains(PyObject *self, PyObject *args) { diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 0ef839adf..cab45292d 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -340,7 +340,8 @@ void remove_wait_request_for_object(plasma_manager_state *manager_state, } } /* In principle, if there are no more wait requests involving this object - * ID, then we could remove the object_wait_reqs struct. */ + * ID, then we could remove the object_wait_reqs struct. However, the + * object_wait_reqs struct gets removed in update_object_wait_requests. */ } } @@ -383,9 +384,17 @@ void update_object_wait_requests(plasma_manager_state *manager_state, HASH_FIND(hh, *object_wait_requests_table_ptr, &obj_id, sizeof(obj_id), object_wait_reqs); if (object_wait_reqs != NULL) { - for (int i = 0; i < utarray_len(object_wait_reqs->wait_requests); ++i) { - wait_request **wait_req_ptr = - (wait_request **) utarray_eltptr(object_wait_reqs->wait_requests, i); + /* We compute the number of requests first because the length of the utarray + * will change as we iterate over it (because each call to return_from_wait + * will remove one element). */ + int num_requests = utarray_len(object_wait_reqs->wait_requests); + /* The argument index is the index of the current element of the utarray + * that we are processing. It may differ from the counter i when elements + * are removed from the array. */ + int index = 0; + for (int i = 0; i < num_requests; ++i) { + wait_request **wait_req_ptr = (wait_request **) utarray_eltptr( + object_wait_reqs->wait_requests, index); wait_request *wait_req = *wait_req_ptr; wait_req->num_satisfied += 1; /* Mark the object as present in the wait request. */ @@ -404,8 +413,13 @@ void update_object_wait_requests(plasma_manager_state *manager_state, /* If this wait request is done, reply to the client. */ if (wait_req->num_satisfied == wait_req->num_objects_to_wait_for) { return_from_wait(manager_state, wait_req); + } else { + /* The call to return_from_wait will remove the current element in the + * array, so we only increment the counter in the else branch. */ + index += 1; } } + DCHECK(index == utarray_len(object_wait_reqs->wait_requests)); /* Remove the array of wait requests for this object, since no one should be * waiting for this object anymore. */ HASH_DELETE(hh, *object_wait_requests_table_ptr, object_wait_reqs); @@ -721,7 +735,7 @@ client_connection *get_manager_connection(plasma_manager_state *state, } void process_transfer_request(event_loop *loop, - object_id object_id, + object_id obj_id, const char *addr, int port, client_connection *conn) { @@ -738,25 +752,27 @@ void process_transfer_request(event_loop *loop, * do a non-blocking get call on the store, and if the object isn't there then * perhaps the manager should initiate the transfer when it receives a * notification from the store that the object is present. */ - int has_obj; + object_buffer obj_buffer; int counter = 0; do { - plasma_contains(conn->manager_state->plasma_conn, object_id, &has_obj); + /* We pass in 0 to indicate that the command should return immediately. */ + object_id obj_id_array[1] = {obj_id}; + plasma_get(conn->manager_state->plasma_conn, obj_id_array, 1, 0, + &obj_buffer); if (counter > 0) { LOG_WARN("Blocking in the plasma manager."); } counter += 1; - } while (!has_obj); - plasma_get(conn->manager_state->plasma_conn, object_id, &data_size, &data, - &metadata_size, &metadata); - assert(metadata == data + data_size); + } while (obj_buffer.data_size == -1); + DCHECK(obj_buffer.metadata == obj_buffer.data + obj_buffer.data_size); plasma_request_buffer *buf = malloc(sizeof(plasma_request_buffer)); buf->type = MessageType_PlasmaDataReply; - buf->object_id = object_id; - buf->data = data; /* We treat this as a pointer to the - concatenated data and metadata. */ - buf->data_size = data_size; - buf->metadata_size = metadata_size; + buf->object_id = obj_id; + /* We treat buf->data as a pointer to the concatenated data and metadata, so + * we don't actually use buf->metadata. */ + buf->data = obj_buffer.data; + buf->data_size = obj_buffer.data_size; + buf->metadata_size = obj_buffer.metadata_size; client_connection *manager_conn = get_manager_connection(conn->manager_state, addr, port); diff --git a/src/plasma/plasma_protocol.c b/src/plasma/plasma_protocol.c index 1c96fea9a..ffd7a8847 100644 --- a/src/plasma/plasma_protocol.c +++ b/src/plasma/plasma_protocol.c @@ -402,10 +402,12 @@ void plasma_read_EvictReply(uint8_t *data, int64_t *num_bytes) { int plasma_send_GetRequest(int sock, protocol_builder *B, object_id object_ids[], - int64_t num_objects) { + int64_t num_objects, + int64_t timeout_ms) { PlasmaGetRequest_start_as_root(B); PlasmaGetRequest_object_ids_add( B, object_ids_to_flatbuffer(B, object_ids, num_objects)); + PlasmaGetRequest_timeout_ms_add(B, timeout_ms); PlasmaGetRequest_end_as_root(B); return finalize_buffer_and_send(B, sock, MessageType_PlasmaGetRequest); } @@ -418,11 +420,13 @@ int64_t plasma_read_GetRequest_num_objects(uint8_t *data) { void plasma_read_GetRequest(uint8_t *data, object_id object_ids[], + int64_t *timeout_ms, int64_t num_objects) { DCHECK(data); PlasmaGetRequest_table_t req = PlasmaGetRequest_as_root(data); flatbuffers_string_vec_t object_id_vector = PlasmaGetRequest_object_ids(req); object_ids_from_flatbuffer(object_id_vector, object_ids, num_objects); + *timeout_ms = PlasmaGetRequest_timeout_ms(req); } int plasma_send_GetReply(int sock, @@ -478,100 +482,6 @@ void plasma_read_GetReply(uint8_t *data, } } -/* Plasma get local messages. */ - -int plasma_send_GetLocalRequest(int sock, - protocol_builder *B, - object_id object_ids[], - int64_t num_objects) { - PlasmaGetLocalRequest_start_as_root(B); - PlasmaGetLocalRequest_object_ids_add( - B, object_ids_to_flatbuffer(B, object_ids, num_objects)); - PlasmaGetLocalRequest_end_as_root(B); - return finalize_buffer_and_send(B, sock, MessageType_PlasmaGetLocalRequest); -} - -int64_t plasma_read_GetLocalRequest_num_objects(uint8_t *data) { - DCHECK(data); - PlasmaGetLocalRequest_table_t req = PlasmaGetLocalRequest_as_root(data); - return flatbuffers_string_vec_len(PlasmaGetLocalRequest_object_ids(req)); -} - -void plasma_read_GetLocalRequest(uint8_t *data, - object_id object_ids[], - int64_t num_objects) { - DCHECK(data); - PlasmaGetLocalRequest_table_t req = PlasmaGetLocalRequest_as_root(data); - flatbuffers_string_vec_t object_id_vector = - PlasmaGetLocalRequest_object_ids(req); - object_ids_from_flatbuffer(object_id_vector, object_ids, num_objects); -} - -int plasma_send_GetLocalReply(int sock, - protocol_builder *B, - object_id object_ids[], - plasma_object plasma_objects[], - int has_object[], - int64_t num_objects) { - PlasmaGetLocalReply_start_as_root(B); - - flatbuffers_string_vec_ref_t ids = - object_ids_to_flatbuffer(B, object_ids, num_objects); - PlasmaGetLocalReply_object_ids_add(B, ids); - - PlasmaObject_vec_start(B); - for (int i = 0; i < num_objects; ++i) { - plasma_object obj = plasma_objects[i]; - PlasmaObject_t plasma_obj; - memset(&plasma_obj, 0, sizeof(PlasmaObject_t)); - plasma_obj.segment_index = obj.handle.store_fd; - plasma_obj.mmap_size = obj.handle.mmap_size; - plasma_obj.data_offset = obj.data_offset; - plasma_obj.data_size = obj.data_size; - plasma_obj.metadata_offset = obj.metadata_offset; - plasma_obj.metadata_size = obj.metadata_size; - PlasmaObject_vec_push(B, &plasma_obj); - } - PlasmaObject_vec_ref_t object_vec = PlasmaObject_vec_end(B); - PlasmaGetLocalReply_plasma_objects_add(B, object_vec); - - flatbuffers_int32_vec_start(B); - for (int64_t i = 0; i < num_objects; ++i) { - flatbuffers_int32_vec_push(B, &has_object[i]); - } - PlasmaGetLocalReply_has_object_add(B, flatbuffers_int32_vec_end(B)); - PlasmaGetLocalReply_end_as_root(B); - return finalize_buffer_and_send(B, sock, MessageType_PlasmaGetLocalReply); -} - -void plasma_read_GetLocalReply(uint8_t *data, - object_id object_ids[], - plasma_object plasma_objects[], - int has_object[], - int64_t num_objects) { - CHECK(data); - PlasmaGetLocalReply_table_t req = PlasmaGetLocalReply_as_root(data); - flatbuffers_string_vec_t object_id_vector = - PlasmaGetLocalReply_object_ids(req); - object_ids_from_flatbuffer(object_id_vector, object_ids, num_objects); - - memset(plasma_objects, 0, sizeof(plasma_object) * num_objects); - PlasmaObject_vec_t plasma_objects_vector = - PlasmaGetLocalReply_plasma_objects(req); - - for (int i = 0; i < num_objects; ++i) { - PlasmaObject_struct_t obj = PlasmaObject_vec_at(plasma_objects_vector, i); - plasma_objects[i].handle.store_fd = PlasmaObject_segment_index(obj); - plasma_objects[i].handle.mmap_size = PlasmaObject_mmap_size(obj); - plasma_objects[i].data_offset = PlasmaObject_data_offset(obj); - plasma_objects[i].data_size = PlasmaObject_data_size(obj); - plasma_objects[i].metadata_offset = PlasmaObject_metadata_offset(obj); - plasma_objects[i].metadata_size = PlasmaObject_metadata_size(obj); - has_object[i] = - flatbuffers_int32_vec_at(PlasmaGetLocalReply_has_object(req), i); - } -} - /* Plasma fetch messages. */ int plasma_send_FetchRequest(int sock, diff --git a/src/plasma/plasma_protocol.h b/src/plasma/plasma_protocol.h index 5bdbad9d8..f716f3482 100644 --- a/src/plasma/plasma_protocol.h +++ b/src/plasma/plasma_protocol.h @@ -69,12 +69,14 @@ void plasma_read_SealReply(uint8_t *data, object_id *object_id, int *error); int plasma_send_GetRequest(int sock, protocol_builder *B, object_id object_ids[], - int64_t num_objects); + int64_t num_objects, + int64_t timeout_ms); int64_t plasma_read_GetRequest_num_objects(uint8_t *data); void plasma_read_GetRequest(uint8_t *data, object_id object_ids[], + int64_t *timeout_ms, int64_t num_objects); int plasma_send_GetReply(int sock, @@ -88,32 +90,6 @@ void plasma_read_GetReply(uint8_t *data, plasma_object plasma_objects[], int64_t num_objects); -/* Plasma Get local message functions. */ - -int plasma_send_GetLocalRequest(int sock, - protocol_builder *B, - object_id object_ids[], - int64_t num_objects); - -int64_t plasma_read_GetLocalRequest_num_objects(uint8_t *data); - -void plasma_read_GetLocalRequest(uint8_t *data, - object_id object_ids[], - int64_t num_objects); - -int plasma_send_GetLocalReply(int sock, - protocol_builder *B, - object_id object_ids[], - plasma_object plasma_objects[], - int has_object[], - int64_t num_objects); - -void plasma_read_GetLocalReply(uint8_t *data, - object_id object_ids[], - plasma_object plasma_objects[], - int has_object[], - int64_t num_objects); - /* Plasma Release message functions. */ int plasma_send_ReleaseRequest(int sock, diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index 22bb5854c..1c946afbc 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -40,15 +40,6 @@ void *dlmalloc(size_t); void dlfree(void *); -typedef struct { - /* Object id of this object. */ - object_id object_id; - /* An array of the clients that are waiting to get this object. */ - UT_array *waiting_clients; - /* Handle for the uthash table. */ - UT_hash_handle handle; -} object_notify_entry; - /** Contains all information that is associated with a Plasma store client. */ struct client { /** The socket used to communicate with the client. */ @@ -71,11 +62,46 @@ typedef struct { UT_hash_handle hh; } notification_queue; +typedef struct { + /** The client connection that called get. */ + client *client; + /** The ID of the timer that will time out and cause this wait to return to + * the client if it hasn't already returned. */ + int64_t timer; + /** The number of objects in this get request. */ + int64_t num_object_ids; + /** The object IDs involved in this request. This is used in the reply. */ + object_id *object_ids; + /** The object information for the objects in this request. This is used in + * the reply. */ + plasma_object *objects; + /** The minimum number of objects to wait for in this request. */ + int64_t num_objects_to_wait_for; + /** The number of object requests in this wait request that are already + * satisfied. */ + int64_t num_satisfied; +} get_request; + +typedef struct { + /** The ID of the object. This is used as a key in a hash table. */ + object_id object_id; + /** An array of the get requests involving this object ID. */ + UT_array *get_requests; + /** Handle for the uthash table in the store state that keeps track of the get + * requests involving this object ID. */ + UT_hash_handle hh; +} object_get_requests; + +/** This is used to define the utarray of get requests in the + * object_get_requests struct. */ +UT_icd get_request_icd = {sizeof(get_request *), NULL, NULL, NULL}; + struct plasma_store_state { /* Event loop of the plasma store. */ event_loop *loop; - /* Objects that processes are waiting for. */ - object_notify_entry *objects_notify; + /** A hash table mapping object IDs to a vector of the get requests that are + * waiting for the object to arrive. */ + object_get_requests *object_get_requests; /** The pending notifications that have not been sent to subscribers because * the socket send buffers were full. This is a hash table from client file * descriptor to an array of object_ids to send to that client. */ @@ -97,7 +123,7 @@ UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL}; plasma_store_state *init_plasma_store(event_loop *loop, int64_t system_memory) { plasma_store_state *state = malloc(sizeof(plasma_store_state)); state->loop = loop; - state->objects_notify = NULL; + state->object_get_requests = NULL; state->pending_notifications = NULL; /* Initialize the plasma store info. */ state->plasma_store_info = malloc(sizeof(plasma_store_info)); @@ -208,42 +234,243 @@ int create_object(client *client_context, return PlasmaError_OK; } -/* Get an object from the hash table. */ -int get_object(client *client_context, - int conn, - object_id object_id, - plasma_object *result) { - plasma_store_state *plasma_state = client_context->plasma_state; - object_table_entry *entry; - HASH_FIND(handle, plasma_state->plasma_store_info->objects, &object_id, - sizeof(object_id), entry); - if (entry && entry->state == PLASMA_SEALED) { - result->handle.store_fd = entry->fd; - result->handle.mmap_size = entry->map_size; - result->data_offset = entry->offset; - result->metadata_offset = entry->offset + entry->info.data_size; - result->data_size = entry->info.data_size; - result->metadata_size = entry->info.metadata_size; - /* If necessary, record that this client is using this object. In the case - * where entry == NULL, this will be called from seal_object. */ - add_client_to_object_clients(entry, client_context); - return OBJECT_FOUND; - } else { - object_notify_entry *notify_entry; - LOG_DEBUG("object not in hash table of sealed objects"); - HASH_FIND(handle, plasma_state->objects_notify, &object_id, - sizeof(object_id), notify_entry); - if (!notify_entry) { - notify_entry = malloc(sizeof(object_notify_entry)); - memset(notify_entry, 0, sizeof(object_notify_entry)); - utarray_new(notify_entry->waiting_clients, &client_icd); - notify_entry->object_id = object_id; - HASH_ADD(handle, plasma_state->objects_notify, object_id, - sizeof(object_id), notify_entry); - } - utarray_push_back(notify_entry->waiting_clients, &client_context); +void add_get_request_for_object(plasma_store_state *store_state, + object_id object_id, + get_request *get_req) { + object_get_requests *object_get_reqs; + HASH_FIND(hh, store_state->object_get_requests, &object_id, sizeof(object_id), + object_get_reqs); + /* If there are currently no get requests involving this object ID, create a + * new object_get_requests struct for this object ID and add it to the hash + * table. */ + if (object_get_reqs == NULL) { + object_get_reqs = malloc(sizeof(object_get_requests)); + object_get_reqs->object_id = object_id; + utarray_new(object_get_reqs->get_requests, &get_request_icd); + HASH_ADD(hh, store_state->object_get_requests, object_id, + sizeof(object_get_reqs->object_id), object_get_reqs); + } + /* Add this get request to the vector of get requests involving this object + * ID. */ + utarray_push_back(object_get_reqs->get_requests, &get_req); +} + +void remove_get_request_for_object(plasma_store_state *store_state, + object_id object_id, + get_request *get_req) { + object_get_requests *object_get_reqs; + HASH_FIND(hh, store_state->object_get_requests, &object_id, sizeof(object_id), + object_get_reqs); + /* If there is a vector of get requests for this object ID, and if this vector + * contains the get request, then remove the get request from the vector. */ + if (object_get_reqs != NULL) { + for (int i = 0; i < utarray_len(object_get_reqs->get_requests); ++i) { + get_request **get_req_ptr = + (get_request **) utarray_eltptr(object_get_reqs->get_requests, i); + if (*get_req_ptr == get_req) { + /* Remove the get request from the array. */ + utarray_erase(object_get_reqs->get_requests, i, 1); + break; + } + } + /* In principle, if there are no more get requests involving this object ID, + * then we could remove the object_get_reqs struct. However, the + * object_get_reqs struct gets removed in update_object_get_requests. */ + } +} + +void remove_get_request(plasma_store_state *store_state, get_request *get_req) { + if (get_req->timer != -1) { + CHECK(event_loop_remove_timer(store_state->loop, get_req->timer) == AE_OK); + } + free(get_req->object_ids); + free(get_req->objects); + free(get_req); +} + +void initialize_plasma_object(plasma_object *object, + object_table_entry *entry) { + DCHECK(object != NULL); + DCHECK(entry != NULL); + DCHECK(entry->state == PLASMA_SEALED); + object->handle.store_fd = entry->fd; + object->handle.mmap_size = entry->map_size; + object->data_offset = entry->offset; + object->metadata_offset = entry->offset + entry->info.data_size; + object->data_size = entry->info.data_size; + object->metadata_size = entry->info.metadata_size; +} + +void return_from_get(plasma_store_state *store_state, get_request *get_req) { + /* Send the get reply to the client. */ + int status = plasma_send_GetReply(get_req->client->sock, store_state->builder, + get_req->object_ids, get_req->objects, + get_req->num_object_ids); + warn_if_sigpipe(status, get_req->client->sock); + /* If we successfully sent the get reply message to the client, then also send + * the file descriptors. */ + if (status >= 0) { + /* Send all of the file descriptors for the present objects. */ + for (int i = 0; i < get_req->num_object_ids; ++i) { + /* We use the data size to indicate whether the object is present or not. + */ + if (get_req->objects[i].data_size != -1) { + int error_code = + send_fd(get_req->client->sock, get_req->objects[i].handle.store_fd); + /* If we failed to send the file descriptor, loop until we have sent it + * successfully. TODO(rkn): This is problematic for two reasons. First + * of all, sending the file descriptor should just succeed without any + * errors, but sometimes I see a "Message too long" error number. + * Second, looping like this allows a client to potentially block the + * plasma store event loop which should never happen. */ + while (error_code < 0) { + if (errno == EMSGSIZE) { + LOG_WARN("Failed to send file descriptor, retrying."); + error_code = send_fd(get_req->client->sock, + get_req->objects[i].handle.store_fd); + continue; + } + warn_if_sigpipe(error_code, get_req->client->sock); + break; + } + } + } + } + + /* Remove the get request from each of the relevant object_get_requests hash + * tables if it is present there. It should only be present there if the get + * request timed out. */ + for (int i = 0; i < get_req->num_object_ids; ++i) { + remove_get_request_for_object(store_state, get_req->object_ids[i], get_req); + } + /* Remove the get request. */ + remove_get_request(store_state, get_req); +} + +void update_object_get_requests(plasma_store_state *store_state, + object_id obj_id) { + /* Update the in-progress get requests. */ + object_get_requests *object_get_reqs; + HASH_FIND(hh, store_state->object_get_requests, &obj_id, sizeof(obj_id), + object_get_reqs); + if (object_get_reqs != NULL) { + /* We compute the number of requests first because the length of the utarray + * will change as we iterate over it (because each call to return_from_get + * will remove one element). */ + int num_requests = utarray_len(object_get_reqs->get_requests); + /* The argument index is the index of the current element of the utarray + * that we are processing. It may differ from the counter i when elements + * are removed from the array. */ + int index = 0; + for (int i = 0; i < num_requests; ++i) { + get_request **get_req_ptr = + (get_request **) utarray_eltptr(object_get_reqs->get_requests, index); + get_request *get_req = *get_req_ptr; + + int num_updated = 0; + for (int j = 0; j < get_req->num_objects_to_wait_for; ++j) { + object_table_entry *entry; + HASH_FIND(handle, store_state->plasma_store_info->objects, &obj_id, + sizeof(obj_id), entry); + CHECK(entry != NULL); + + if (object_ids_equal(get_req->object_ids[j], obj_id)) { + initialize_plasma_object(&get_req->objects[j], entry); + num_updated += 1; + get_req->num_satisfied += 1; + /* Record the fact that this client will be using this object and will + * be responsible for releasing this object. */ + add_client_to_object_clients(entry, get_req->client); + } + } + /* Check a few things just to be sure there aren't bugs. */ + DCHECK(num_updated > 0); + if (num_updated > 1) { + LOG_WARN("A get request contained a duplicated object ID."); + } + + /* If this get request is done, reply to the client. */ + if (get_req->num_satisfied == get_req->num_objects_to_wait_for) { + return_from_get(store_state, get_req); + } else { + /* The call to return_from_get will remove the current element in the + * array, so we only increment the counter in the else branch. */ + index += 1; + } + } + DCHECK(index == utarray_len(object_get_reqs->get_requests)); + /* Remove the array of get requests for this object, since no one should be + * waiting for this object anymore. */ + HASH_DELETE(hh, store_state->object_get_requests, object_get_reqs); + utarray_free(object_get_reqs->get_requests); + free(object_get_reqs); + } +} + +int get_timeout_handler(event_loop *loop, timer_id id, void *context) { + get_request *get_req = context; + return_from_get(get_req->client->plasma_state, get_req); + return EVENT_LOOP_TIMER_DONE; +} + +void process_get_request(client *client_context, + int num_object_ids, + object_id object_ids[], + uint64_t timeout_ms) { + plasma_store_state *plasma_state = client_context->plasma_state; + + /* Create a get request for this object. */ + get_request *get_req = malloc(sizeof(get_request)); + memset(get_req, 0, sizeof(get_request)); + get_req->client = client_context; + get_req->timer = -1; + get_req->num_object_ids = num_object_ids; + get_req->object_ids = malloc(num_object_ids * sizeof(object_id)); + get_req->objects = malloc(num_object_ids * sizeof(plasma_object)); + for (int i = 0; i < num_object_ids; ++i) { + get_req->object_ids[i] = object_ids[i]; + } + get_req->num_objects_to_wait_for = num_object_ids; + get_req->num_satisfied = 0; + + for (int i = 0; i < num_object_ids; ++i) { + object_id obj_id = object_ids[i]; + + /* Check if this object is already present locally. If so, record that the + * object is being used and mark it as accounted for. */ + object_table_entry *entry; + HASH_FIND(handle, plasma_state->plasma_store_info->objects, &obj_id, + sizeof(obj_id), entry); + if (entry && entry->state == PLASMA_SEALED) { + /* Update the get request to take into account the present object. */ + initialize_plasma_object(&get_req->objects[i], entry); + get_req->num_satisfied += 1; + /* If necessary, record that this client is using this object. In the case + * where entry == NULL, this will be called from seal_object. */ + add_client_to_object_clients(entry, client_context); + } else { + /* Add a placeholder plasma object to the get request to indicate that the + * object is not present. This will be parsed by the client. We memset it + * to 0 so valgrind doesn't complain. We set the data size to -1 to + * indicate that the object is not present. */ + memset(&get_req->objects[i], 0, sizeof(get_req->objects[i])); + get_req->objects[i].data_size = -1; + /* Add the get request to the relevant data structures. */ + add_get_request_for_object(plasma_state, obj_id, get_req); + } + } + + /* If all of the objects are present already or if the timeout is 0, return to + * the client. */ + if (get_req->num_satisfied == get_req->num_objects_to_wait_for || + timeout_ms == 0) { + return_from_get(plasma_state, get_req); + } else if (timeout_ms != -1) { + /* Set a timer that will cause the get request to return to the client. Note + * that a timeout of -1 is used to indicate that no timer should be set. */ + get_req->timer = event_loop_add_timer(plasma_state->loop, timeout_ms, + get_timeout_handler, get_req); } - return OBJECT_NOT_FOUND; } /* Get an object from the local Plasma Store if exists. */ @@ -337,56 +564,8 @@ void seal_object(client *client_context, /* Inform all subscribers that a new object has been sealed. */ push_notification(plasma_state, object_id); - /* Inform processes getting this object that the object is ready now. */ - object_notify_entry *notify_entry; - HASH_FIND(handle, plasma_state->objects_notify, &object_id, sizeof(object_id), - notify_entry); - if (notify_entry) { - plasma_object object; - object.handle.store_fd = entry->fd; - object.handle.mmap_size = entry->map_size; - object.data_offset = entry->offset; - object.metadata_offset = entry->offset + entry->info.data_size; - object.data_size = entry->info.data_size; - object.metadata_size = entry->info.metadata_size; - HASH_DELETE(handle, plasma_state->objects_notify, notify_entry); - /* Send notifications to the clients that were waiting for this object. */ - for (int i = 0; i < utarray_len(notify_entry->waiting_clients); ++i) { - client **c = (client **) utarray_eltptr(notify_entry->waiting_clients, i); - int status; - /* Send the get reply to the client. Handle errors on the write. If the - * client has hung up, that's ok. */ - if (plasma_send_GetReply((*c)->sock, plasma_state->builder, &object_id, - &object, 1) < 0) { - if (errno == EPIPE || errno == EBADF) { - LOG_WARN( - "Failed to send a message to client on fd %d. The client may " - "have hung up.", - (*c)->sock); - continue; - } else { - LOG_FATAL("Failed to send a message to client on fd %d.", (*c)->sock); - } - } - /* Send the object's file descriptor to the client. Handle errors on the - * write. If the client has hung up, that's ok. */ - if (send_fd((*c)->sock, object.handle.store_fd) < 0) { - if (errno == EPIPE || errno == EBADF) { - LOG_WARN( - "Failed to send a message to client on fd %d. The client may " - "have hung up.", - (*c)->sock); - continue; - } else { - LOG_FATAL("Failed to send a message to client on fd %d.", (*c)->sock); - } - } - /* Record that the client is using this object. */ - add_client_to_object_clients(entry, *c); - } - utarray_free(notify_entry->waiting_clients); - free(notify_entry); - } + /* Update all get requests that involve this object. */ + update_object_get_requests(plasma_state, object_id); } /* Delete an object that has been created in the hash table. This should only @@ -556,35 +735,12 @@ void process_message(event_loop *loop, } } break; case MessageType_PlasmaGetRequest: { - plasma_read_GetRequest(input, object_ids, 1); - if (get_object(client_context, client_sock, object_ids[0], &objects[0]) == - OBJECT_FOUND) { - warn_if_sigpipe(plasma_send_GetReply(client_sock, state->builder, - object_ids, objects, 1), - client_sock); - warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd), - client_sock); - } - } break; - case MessageType_PlasmaGetLocalRequest: { - plasma_read_GetLocalRequest(input, &object_ids[0], 1); - if (get_object_local(client_context, client_sock, object_ids[0], - &objects[0]) == OBJECT_FOUND) { - int has_object = 1; - warn_if_sigpipe( - plasma_send_GetLocalReply(client_sock, state->builder, object_ids, - objects, &has_object, 1), - client_sock); - warn_if_sigpipe(send_fd(client_sock, objects[0].handle.store_fd), - client_sock); - } else { - int has_object = 0; - - warn_if_sigpipe( - plasma_send_GetLocalReply(client_sock, state->builder, object_ids, - objects, &has_object, 1), - client_sock); - } + num_objects = plasma_read_GetRequest_num_objects(input); + object_id object_ids_to_get[num_objects]; + int64_t timeout_ms; + plasma_read_GetRequest(input, object_ids_to_get, &timeout_ms, num_objects); + process_get_request(client_context, num_objects, object_ids_to_get, + timeout_ms); } break; case MessageType_PlasmaReleaseRequest: plasma_read_ReleaseRequest(input, &object_ids[0]); diff --git a/src/plasma/test/client_tests.c b/src/plasma/test/client_tests.c index 06a19744b..d91a26c25 100644 --- a/src/plasma/test/client_tests.c +++ b/src/plasma/test/client_tests.c @@ -115,15 +115,16 @@ bool is_equal_data_123(uint8_t *data1, uint8_t *data2, uint64_t size) { return true; } -TEST plasma_get_local_tests(void) { +TEST plasma_nonblocking_get_tests(void) { plasma_connection *plasma_conn = plasma_connect( "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); object_id oid = globally_unique_id(); + object_id oid_array[1] = {oid}; object_buffer obj_buffer; /* Test for object non-existence. */ - int status = plasma_get_local(plasma_conn, oid, &obj_buffer); - ASSERT(status == false); + plasma_get(plasma_conn, oid_array, 1, 0, &obj_buffer); + ASSERT(obj_buffer.data_size == -1); /* Test for the object being in local Plasma store. */ /* First create object. */ @@ -136,8 +137,7 @@ TEST plasma_get_local_tests(void) { plasma_seal(plasma_conn, oid); sleep(1); - status = plasma_get_local(plasma_conn, oid, &obj_buffer); - ASSERT(status == true); + plasma_get(plasma_conn, oid_array, 1, 0, &obj_buffer); ASSERT(is_equal_data_123(data, obj_buffer.data, data_size) == true); sleep(1); @@ -222,6 +222,9 @@ TEST plasma_get_tests(void) { object_id oid2 = globally_unique_id(); object_buffer obj_buffer; + object_id oid_array1[1] = {oid1}; + object_id oid_array2[1] = {oid2}; + int64_t data_size = 4; uint8_t metadata[] = {5}; int64_t metadata_size = sizeof(metadata); @@ -230,14 +233,15 @@ TEST plasma_get_tests(void) { init_data_123(data, data_size, 1); plasma_seal(plasma_conn1, oid1); - plasma_client_get(plasma_conn1, oid1, &obj_buffer); + plasma_get(plasma_conn1, oid_array1, 1, -1, &obj_buffer); ASSERT(data[0] == obj_buffer.data[0]); plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); init_data_123(data, data_size, 2); plasma_seal(plasma_conn2, oid2); - plasma_client_get(plasma_conn1, oid2, &obj_buffer); + plasma_fetch(plasma_conn1, 1, oid_array2); + plasma_get(plasma_conn1, oid_array2, 1, -1, &obj_buffer); ASSERT(data[0] == obj_buffer.data[0]); sleep(1); @@ -247,82 +251,7 @@ TEST plasma_get_tests(void) { PASS(); } -TEST plasma_wait_tests(void) { - plasma_connection *plasma_conn1 = plasma_connect( - "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); - plasma_connection *plasma_conn2 = plasma_connect( - "/tmp/store2", "/tmp/manager2", PLASMA_DEFAULT_RELEASE_DELAY); - object_id oid1 = globally_unique_id(); - object_id oid2 = globally_unique_id(); - object_id obj_ids[NUM_OBJ_REQUEST]; - object_id return_obj_ids[NUM_OBJ_REQUEST]; - - obj_ids[0] = oid1; - obj_ids[1] = oid2; - - struct timeval start, end; - gettimeofday(&start, NULL); - - int n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, - WAIT_TIMEOUT_MS, 1, return_obj_ids); - - ASSERT(n == 0); - gettimeofday(&end, NULL); - float diff_ms = (end.tv_sec - start.tv_sec); - diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; - /* Reduce threshold by 10% to make sure we pass consistently. */ - ASSERT(diff_ms > WAIT_TIMEOUT_MS * 0.9); - - /* Create and insert an object in plasma_conn1. */ - int64_t data_size = 4; - uint8_t metadata[] = {5}; - int64_t metadata_size = sizeof(metadata); - uint8_t *data; - plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); - plasma_seal(plasma_conn1, oid1); - - n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, - WAIT_TIMEOUT_MS, 1, return_obj_ids); - ASSERT(n == 1); - ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); - - gettimeofday(&start, NULL); - return_obj_ids[0].id[0] = 0; - n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, - WAIT_TIMEOUT_MS, 2, return_obj_ids); - ASSERT(n == 1); - ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); - gettimeofday(&end, NULL); - diff_ms = (end.tv_sec - start.tv_sec); - diff_ms = (((diff_ms * 1000000.) + end.tv_usec) - (start.tv_usec)) / 1000.; - ASSERT(diff_ms > WAIT_TIMEOUT_MS * 0.9); - - /* Create and insert an object in plasma_conn1. */ - plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); - plasma_seal(plasma_conn2, oid2); - - return_obj_ids[0].id[0] = 0; - n = plasma_client_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_ids, - WAIT_TIMEOUT_MS, 2, return_obj_ids); - ASSERT(n == 2); - ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); - ASSERT(oid2.id[0] == return_obj_ids[1].id[0]); - - return_obj_ids[0].id[0] = return_obj_ids[1].id[0] = 0; - n = plasma_client_wait(plasma_conn2, NUM_OBJ_REQUEST, obj_ids, - WAIT_TIMEOUT_MS, 2, return_obj_ids); - ASSERT(n == 2); - ASSERT(oid1.id[0] == return_obj_ids[0].id[0]); - ASSERT(oid2.id[0] == return_obj_ids[1].id[0]); - - sleep(1); - plasma_disconnect(plasma_conn1); - plasma_disconnect(plasma_conn2); - - PASS(); -} - -TEST plasma_multiget_tests(void) { +TEST plasma_get_multiple_tests(void) { plasma_connection *plasma_conn1 = plasma_connect( "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); plasma_connection *plasma_conn2 = plasma_connect( @@ -344,14 +273,16 @@ TEST plasma_multiget_tests(void) { init_data_123(data, data_size, obj1_first); plasma_seal(plasma_conn1, oid1); - plasma_client_multiget(plasma_conn1, 1, obj_ids, obj_buffer); + /* This only waits for oid1. */ + plasma_get(plasma_conn1, obj_ids, 1, -1, obj_buffer); ASSERT(data[0] == obj_buffer[0].data[0]); plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); init_data_123(data, data_size, obj2_first); plasma_seal(plasma_conn2, oid2); - plasma_client_multiget(plasma_conn1, 2, obj_ids, obj_buffer); + plasma_fetch(plasma_conn1, 2, obj_ids); + plasma_get(plasma_conn1, obj_ids, 2, -1, obj_buffer); ASSERT(obj1_first == obj_buffer[0].data[0]); ASSERT(obj2_first == obj_buffer[1].data[0]); @@ -365,11 +296,10 @@ TEST plasma_multiget_tests(void) { SUITE(plasma_client_tests) { RUN_TEST(plasma_status_tests); RUN_TEST(plasma_fetch_tests); - RUN_TEST(plasma_get_local_tests); + RUN_TEST(plasma_nonblocking_get_tests); RUN_TEST(plasma_wait_for_objects_tests); RUN_TEST(plasma_get_tests); - RUN_TEST(plasma_wait_tests); - RUN_TEST(plasma_multiget_tests); + RUN_TEST(plasma_get_multiple_tests); } GREATEST_MAIN_DEFS(); diff --git a/src/plasma/test/serialization_tests.c b/src/plasma/test/serialization_tests.c index d6e47e465..c2696136e 100644 --- a/src/plasma/test/serialization_tests.c +++ b/src/plasma/test/serialization_tests.c @@ -133,13 +133,16 @@ TEST plasma_get_request_test(void) { object_id object_ids[2]; object_ids[0] = globally_unique_id(); object_ids[1] = globally_unique_id(); - plasma_send_GetRequest(fd, g_B, object_ids, 2); + int64_t timeout_ms = 1234; + plasma_send_GetRequest(fd, g_B, object_ids, 2, timeout_ms); uint8_t *data = read_message_from_file(fd, MessageType_PlasmaGetRequest); int64_t num_objects; object_id object_ids_return[2]; - plasma_read_GetRequest(data, &object_ids_return[0], 2); + int64_t timeout_ms_return; + plasma_read_GetRequest(data, &object_ids_return[0], &timeout_ms_return, 2); ASSERT(object_ids_equal(object_ids[0], object_ids_return[0])); ASSERT(object_ids_equal(object_ids[1], object_ids_return[1])); + ASSERT(timeout_ms == timeout_ms_return); free(data); close(fd); PASS(); @@ -155,7 +158,7 @@ TEST plasma_get_reply_test(void) { plasma_objects[1] = random_plasma_object(); plasma_send_GetReply(fd, g_B, object_ids, plasma_objects, 2); uint8_t *data = read_message_from_file(fd, MessageType_PlasmaGetReply); - int64_t num_objects = plasma_read_GetLocalRequest_num_objects(data); + int64_t num_objects = plasma_read_GetRequest_num_objects(data); object_id object_ids_return[num_objects]; plasma_object plasma_objects_return[2]; plasma_read_GetReply(data, object_ids_return, &plasma_objects_return[0], @@ -171,53 +174,6 @@ TEST plasma_get_reply_test(void) { PASS(); } -TEST plasma_get_local_request_test(void) { - int fd = create_temp_file(); - object_id object_ids[2]; - object_ids[0] = globally_unique_id(); - object_ids[1] = globally_unique_id(); - plasma_send_GetLocalRequest(fd, g_B, object_ids, 2); - uint8_t *data = read_message_from_file(fd, MessageType_PlasmaGetLocalRequest); - int64_t num_objects = plasma_read_GetLocalRequest_num_objects(data); - object_id object_ids_return[num_objects]; - plasma_read_GetLocalRequest(data, object_ids_return, num_objects); - ASSERT(object_ids_equal(object_ids[0], object_ids_return[0])); - ASSERT(object_ids_equal(object_ids[1], object_ids_return[1])); - free(data); - close(fd); - PASS(); -} - -TEST plasma_get_local_reply_test(void) { - int fd = create_temp_file(); - object_id object_ids[2]; - object_ids[0] = globally_unique_id(); - object_ids[1] = globally_unique_id(); - plasma_object plasma_objects[2]; - plasma_objects[0] = random_plasma_object(); - plasma_objects[1] = random_plasma_object(); - int has_object[2] = {1, 0}; - plasma_send_GetLocalReply(fd, g_B, object_ids, plasma_objects, has_object, 2); - uint8_t *data = read_message_from_file(fd, MessageType_PlasmaGetLocalReply); - int64_t num_objects; - object_id object_ids_return[2]; - plasma_object plasma_objects_return[2]; - int has_object_return[2]; - plasma_read_GetLocalReply(data, &object_ids_return[0], - &plasma_objects_return[0], has_object_return, 2); - ASSERT(object_ids_equal(object_ids[0], object_ids_return[0])); - ASSERT(object_ids_equal(object_ids[1], object_ids_return[1])); - ASSERT(memcmp(&plasma_objects[0], &plasma_objects_return[0], - sizeof(plasma_object)) == 0); - ASSERT(memcmp(&plasma_objects[1], &plasma_objects_return[1], - sizeof(plasma_object)) == 0); - ASSERT(has_object_return[0] == has_object[0]); - ASSERT(has_object_return[1] == has_object[1]); - free(data); - close(fd); - PASS(); -} - TEST plasma_release_request_test(void) { int fd = create_temp_file(); object_id object_id1 = globally_unique_id(); @@ -462,8 +418,6 @@ SUITE(plasma_serialization_tests) { RUN_TEST(plasma_seal_reply_test); RUN_TEST(plasma_get_request_test); RUN_TEST(plasma_get_reply_test); - RUN_TEST(plasma_get_local_request_test); - RUN_TEST(plasma_get_local_reply_test); RUN_TEST(plasma_release_request_test); RUN_TEST(plasma_release_reply_test); RUN_TEST(plasma_delete_request_test); diff --git a/test/runtest.py b/test/runtest.py index c6b3374fc..d9edcea11 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -382,6 +382,38 @@ class APITest(unittest.TestCase): ray.worker.cleanup() + def testMultipleWaitsAndGets(self): + # It is important to use three workers here, so that the three tasks + # launched in this experiment can run at the same time. + ray.init(num_workers=3) + + @ray.remote + def f(delay): + time.sleep(delay) + return 1 + + @ray.remote + def g(l): + # The argument l should be a list containing one object ID. + ray.wait([l[0]]) + + @ray.remote + def h(l): + # The argument l should be a list containing one object ID. + ray.get(l[0]) + + # Make sure that multiple wait requests involving the same object ID all + # return. + x = f.remote(1) + ray.get([g.remote([x]), g.remote([x])]) + + # Make sure that multiple get requests involving the same object ID all + # return. + x = f.remote(1) + ray.get([h.remote([x]), h.remote([x])]) + + ray.worker.cleanup() + def testCachingEnvironmentVariables(self): # Test that we can define environment variables before the driver is connected. def foo_initializer():