diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index f801055a2..edbab95a5 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -437,7 +437,7 @@ class Worker(object): Args: objectid (object_id.ObjectID): The object ID of the value to retrieve. """ - self.plasma_client.fetch2([objectid.id()]) + self.plasma_client.fetch([objectid.id()]) buff = self.plasma_client.get(objectid.id()) metadata = self.plasma_client.get_metadata(objectid.id()) metadata_size = len(metadata) diff --git a/src/photon/photon_algorithm.c b/src/photon/photon_algorithm.c index c2827e1ba..045d9a19c 100644 --- a/src/photon/photon_algorithm.c +++ b/src/photon/photon_algorithm.c @@ -123,7 +123,7 @@ bool can_run(scheduling_algorithm_state *algorithm_state, task_spec *task) { int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { fetch_object_request *fetch_req = (fetch_object_request *) context; object_id object_ids[1] = {fetch_req->object_id}; - plasma_fetch2(fetch_req->state->plasma_conn, 1, object_ids); + plasma_fetch(fetch_req->state->plasma_conn, 1, object_ids); return LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS; } @@ -140,7 +140,7 @@ void fetch_missing_dependencies(local_scheduler_state *state, if (entry == NULL) { /* The object is not present locally, fetch the object. */ object_id object_ids[1] = {obj_id}; - plasma_fetch2(state->plasma_conn, 1, object_ids); + plasma_fetch(state->plasma_conn, 1, object_ids); /* Create a fetch request and add a timer to the event loop to ensure * that the fetch actually happens. */ fetch_object_request *fetch_req = malloc(sizeof(fetch_object_request)); diff --git a/src/plasma/lib/python/plasma.py b/src/plasma/lib/python/plasma.py index bdeefb340..1be68ec93 100644 --- a/src/plasma/lib/python/plasma.py +++ b/src/plasma/lib/python/plasma.py @@ -188,20 +188,12 @@ class PlasmaClient(object): return libplasma.transfer(self.conn, object_id, addr, port) def fetch(self, object_ids): - """Fetch the object with id object_id from another plasma manager instance. - - Args: - object_id (str): A string used to identify an object. - """ - return libplasma.fetch(self.conn, object_ids) - - def fetch2(self, object_ids): """Fetch the objects with the given IDs from other plasma manager instances. Args: object_ids (List[str]): A list of strings used to identify the objects. """ - return libplasma.fetch2(self.conn, object_ids) + return libplasma.fetch(self.conn, object_ids) def wait(self, object_ids, timeout=PLASMA_WAIT_TIMEOUT, num_returns=1): """Wait until num_returns objects in object_ids are ready. diff --git a/src/plasma/plasma.h b/src/plasma/plasma.h index f17bfbbf8..a0cb531fe 100644 --- a/src/plasma/plasma.h +++ b/src/plasma/plasma.h @@ -120,21 +120,13 @@ enum plasma_message_type { /** Header for sending data. */ PLASMA_DATA, /** Request a fetch of an object in another store. Non-blocking call. */ - PLASMA_FETCH_REMOTE, - /** Request a fetch of an object in another store. Blocking call. */ PLASMA_FETCH, - /** Request a fetch of an object in another store. Non-blocking call. */ - PLASMA_FETCH2, /** Request status of an object, i.e., whether the object is stored in the * local Plasma Store, in a remote Plasma Store, in transfer, or doesn't * exist in the system. */ PLASMA_STATUS, /** Wait until an object becomes available. */ - PLASMA_WAIT, - /** Wait until an object becomes available. */ - PLASMA_WAIT1, - /** Wait until an object becomes available. */ - PLASMA_WAIT2 + PLASMA_WAIT }; typedef struct { diff --git a/src/plasma/plasma_client.c b/src/plasma/plasma_client.c index 56c58557a..696220fc6 100644 --- a/src/plasma/plasma_client.c +++ b/src/plasma/plasma_client.c @@ -552,97 +552,20 @@ void plasma_transfer(plasma_connection *conn, void plasma_fetch(plasma_connection *conn, int num_object_ids, - object_id object_ids[], - int is_fetched[]) { - CHECK(conn->manager_conn >= 0); - /* Make sure that there are no duplicated object IDs. TODO(rkn): we should - * allow this case in the future. */ - CHECK(plasma_object_ids_distinct(num_object_ids, object_ids)); - plasma_request *req = plasma_alloc_request(num_object_ids); - for (int i = 0; i < num_object_ids; ++i) { - req->object_requests[i].object_id = object_ids[i]; - } - LOG_DEBUG("Requesting fetch"); - CHECK(plasma_send_request(conn->manager_conn, PLASMA_FETCH, req) >= 0); - free(req); - - plasma_reply reply; - int success; - for (int received = 0; received < num_object_ids; ++received) { - CHECK(plasma_receive_reply(conn->manager_conn, sizeof(reply), &reply) >= 0); - CHECK(reply.num_object_ids == 1); - success = reply.has_object; - /* Update the correct index in is_fetched. */ - int i = 0; - for (; i < num_object_ids; ++i) { - if (object_ids_equal(object_ids[i], reply.object_requests[0].object_id) && - !is_fetched[i]) { - is_fetched[i] = success; - break; - } - } - CHECKM(i != num_object_ids, - "Received an unexpected object ID from manager during fetch or the " - "object ID was received multiple times."); - } -} - -void plasma_fetch2(plasma_connection *conn, - int num_object_ids, - object_id object_ids[]) { + object_id object_ids[]) { CHECK(conn != NULL); CHECK(conn->manager_conn >= 0); plasma_request *req = plasma_alloc_request(num_object_ids); for (int i = 0; i < num_object_ids; ++i) { req->object_requests[i].object_id = object_ids[i]; } - CHECK(plasma_send_request(conn->manager_conn, PLASMA_FETCH2, req) >= 0); -} - -int plasma_wait(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - uint64_t timeout, - int num_returns, - object_id return_object_ids[]) { - CHECK(conn->manager_conn >= 0); - plasma_request *req = plasma_alloc_request(num_object_ids); - for (int i = 0; i < num_object_ids; ++i) { - req->object_requests[i].object_id = object_ids[i]; - } - req->num_ready_objects = num_returns; - req->timeout = timeout; - CHECK(plasma_send_request(conn->manager_conn, PLASMA_WAIT, req) >= 0); - plasma_free_request(req); - int64_t return_size = plasma_reply_size(num_returns); - plasma_reply *reply = malloc(return_size); - CHECK(plasma_receive_reply(conn->manager_conn, return_size, reply) >= 0); - for (int i = 0; i < num_returns; ++i) { - return_object_ids[i] = reply->object_requests[i].object_id; - } - int num_objects_returned = reply->num_objects_returned; - free(reply); - return num_objects_returned; + CHECK(plasma_send_request(conn->manager_conn, PLASMA_FETCH, req) >= 0); } int get_manager_fd(plasma_connection *conn) { return conn->manager_conn; } -/** === ALTERNATE PLASMA CLIENT API === - - * This API simplifies the previous one in two ways. First if factors out - * object (re)construction from the Plasma Manager. Second, except for - * plasma_wait_for_objects() all other functions are non-blocking. - * - * TODO: - * - plasma_info() not implemented yet, but not needed at this point. - * - assume new implementation of object_table_subscribe() which returns - * if object is in the Local Store (check with jpm). - * - need to phase out old API and drope *1 from the names of the functions - * once the old ones are dropped. - */ - bool plasma_get_local(plasma_connection *conn, object_id object_id, object_buffer *object_buffer) { @@ -692,20 +615,6 @@ bool plasma_get_local(plasma_connection *conn, return true; } -int plasma_fetch_remote(plasma_connection *conn, object_id object_id) { - CHECK(conn != NULL); - CHECK(conn->manager_conn >= 0); - - plasma_request req = plasma_make_request(object_id); - CHECK(plasma_send_request(conn->manager_conn, PLASMA_FETCH_REMOTE, &req) >= - 0); - - plasma_reply reply; - CHECK(plasma_receive_reply(conn->manager_conn, sizeof(reply), &reply) >= 0); - - return reply.object_status; -} - int plasma_status(plasma_connection *conn, object_id object_id) { CHECK(conn != NULL); CHECK(conn->manager_conn >= 0); @@ -719,57 +628,11 @@ int plasma_status(plasma_connection *conn, object_id object_id) { return reply.object_status; } -int plasma_wait_for_objects(plasma_connection *conn, - int num_object_requests, - object_request object_requests[], - int num_ready_objects, - uint64_t timeout_ms) { - CHECK(conn != NULL); - CHECK(conn->manager_conn >= 0); - CHECK(num_object_requests > 0); - - plasma_request *req = plasma_alloc_request(num_object_requests); - for (int i = 0; i < num_object_requests; ++i) { - req->object_requests[i] = object_requests[i]; - } - req->num_ready_objects = num_ready_objects; - req->timeout = timeout_ms; - CHECK(plasma_send_request(conn->manager_conn, PLASMA_WAIT1, req) >= 0); - free(req); - - plasma_reply *reply = plasma_alloc_reply(num_object_requests); - CHECK(plasma_receive_reply(conn->manager_conn, - plasma_reply_size(num_object_requests), - reply) >= 0); - int num_objects_ready = 0; - for (int i = 0; i < num_object_requests; ++i) { - int type, status; - object_requests[i].object_id = reply->object_requests[i].object_id; - type = reply->object_requests[i].type; - object_requests[i].type = type; - status = reply->object_requests[i].status; - object_requests[i].status = status; - - if (type == PLASMA_QUERY_LOCAL) { - if (status == PLASMA_OBJECT_LOCAL) { - num_objects_ready += 1; - } - } else { - CHECK(type == PLASMA_QUERY_ANYWHERE); - if (status == PLASMA_OBJECT_LOCAL || status == PLASMA_OBJECT_REMOTE) { - num_objects_ready += 1; - } - } - } - free(reply); - return num_objects_ready; -} - -int plasma_wait_for_objects2(plasma_connection *conn, - int num_object_requests, - object_request object_requests[], - int num_ready_objects, - uint64_t timeout_ms) { +int plasma_wait(plasma_connection *conn, + int num_object_requests, + object_request object_requests[], + int num_ready_objects, + uint64_t timeout_ms) { CHECK(conn != NULL); CHECK(conn->manager_conn >= 0); CHECK(num_object_requests > 0); @@ -784,7 +647,7 @@ int plasma_wait_for_objects2(plasma_connection *conn, } req->num_ready_objects = num_ready_objects; req->timeout = timeout_ms; - CHECK(plasma_send_request(conn->manager_conn, PLASMA_WAIT2, req) >= 0); + CHECK(plasma_send_request(conn->manager_conn, PLASMA_WAIT, req) >= 0); free(req); plasma_reply *reply = plasma_alloc_reply(num_object_requests); @@ -827,35 +690,37 @@ int plasma_wait_for_objects2(plasma_connection *conn, */ void plasma_client_get(plasma_connection *conn, - object_id object_id, + object_id obj_id, object_buffer *object_buffer) { CHECK(conn != NULL); CHECK(conn->manager_conn >= 0); object_request request; - request.object_id = object_id; + request.object_id = obj_id; while (true) { - if (plasma_get_local(conn, object_id, object_buffer)) { + if (plasma_get_local(conn, obj_id, object_buffer)) { /* Object is in the local Plasma Store, and it is sealed. */ return; } - switch (plasma_fetch_remote(conn, object_id)) { + object_id object_ids[1] = {obj_id}; + plasma_fetch(conn, 1, object_ids); + switch (plasma_status(conn, obj_id)) { case PLASMA_OBJECT_LOCAL: /* Object has finished being transfered just after calling * plasma_get_local(), and it is now in the local Plasma Store. Loop again * to call plasma_get_local() and eventually return. */ continue; case PLASMA_OBJECT_REMOTE: - /* A fetch request has been already scheduled for object_id, so wait for + /* A fetch request has been already scheduled for obj_id, so wait for * it to complete. */ request.type = PLASMA_QUERY_LOCAL; break; case PLASMA_OBJECT_NONEXISTENT: /* Object doesn’t exist in the system so ask local scheduler to create it. */ - /* TODO: scheduler_create_object(object_id); */ + /* TODO: scheduler_create_object(obj_id); */ /* Wait for the object to be (re)constructed and sealed either in the * local Plasma Store or remotely. */ request.type = PLASMA_QUERY_ANYWHERE; @@ -865,9 +730,9 @@ void plasma_client_get(plasma_connection *conn, } /* - * Wait for object_id to (1) be transferred and sealed in the local + * Wait for obj_id to (1) be transferred and sealed in the local * Plasma Store, if available remotely, or (2) be (re)constructued either - * locally or remotely, if object_id didn't exist in the system. + * locally or remotely, if obj_id didn't exist in the system. * - if timeout, next iteration will retry plasma_fetch() or * scheduler_create_object() * - if request.status == PLASMA_OBJECT_LOCAL, next iteration @@ -878,7 +743,7 @@ void plasma_client_get(plasma_connection *conn, * will call scheduler_create_object() */ #define TIMEOUT_WAIT_MS 200 - plasma_wait_for_objects(conn, 1, &request, 1, TIMEOUT_WAIT_MS); + plasma_wait(conn, 1, &request, 1, TIMEOUT_WAIT_MS); } } @@ -908,8 +773,8 @@ int plasma_client_wait(plasma_connection *conn, struct timeval start, end; gettimeofday(&start, NULL); - int n = plasma_wait_for_objects(conn, num_object_ids, requests, num_returns, - MIN(remaining_timeout, TIMEOUT_WAIT_MS)); + int n = plasma_wait(conn, num_object_ids, requests, num_returns, + MIN(remaining_timeout, TIMEOUT_WAIT_MS)); gettimeofday(&end, NULL); float diff_ms = (end.tv_sec - start.tv_sec); @@ -961,30 +826,20 @@ void plasma_client_multiget(plasma_connection *conn, while (true) { int n; - /* Wait to get all objects in the system. The reason we call - * plasma_wait_for_objects() here instead of iterating over - * plasma_client_get() is to increase concurrency as plasma_client_get() is - * blocking. */ - n = plasma_wait_for_objects(conn, num_object_ids, requests, num_object_ids, - TIMEOUT_WAIT_MS); + /* Issue a fetch command so the object IDs end up locally. */ + plasma_fetch(conn, num_object_ids, object_ids); + + /* Wait to get all objects in the system. The reason we call plasma_wait() + * here instead of iterating over plasma_client_get() is to increase + * concurrency as plasma_client_get() is blocking. */ + n = plasma_wait(conn, num_object_ids, requests, num_object_ids, + TIMEOUT_WAIT_MS); if (n == num_object_ids) { /* All objects are in the system either on the local or a remote Plasma * store, so we are done. */ break; } - - for (int i = 0; i < num_object_ids; ++i) { - if (requests[i].status == PLASMA_OBJECT_REMOTE) { - plasma_fetch_remote(conn, requests[i].object_id); - } else { - if (requests[i].status == PLASMA_OBJECT_NONEXISTENT) { - /* Object doesn’t exist so ask local scheduler to create it. */ - /* TODO: scheduler_create_object(requests[i].object_id); */ - printf("XXX Need to schedule object -- not implemented yet!\n"); - } - } - } } /* Now get the data for every object. */ diff --git a/src/plasma/plasma_client.h b/src/plasma/plasma_client.h index c13464feb..0086120b2 100644 --- a/src/plasma/plasma_client.h +++ b/src/plasma/plasma_client.h @@ -184,38 +184,16 @@ void plasma_delete(plasma_connection *conn, object_id object_id); */ int64_t plasma_evict(plasma_connection *conn, int64_t num_bytes); -/** - * Fetch objects from remote plasma stores that have the - * objects stored. - * - * @param manager A file descriptor for the socket connection - * to the local manager. - * @param object_id_count The number of object IDs requested. - * @param object_ids[] The vector of object IDs requested. Length must be at - * least num_object_ids. - * @param is_fetched[] The vector in which to return the success - * of each object's fetch operation, in the same order as - * object_ids. Length must be at least num_object_ids. - * @return Void. - */ -void plasma_fetch(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - int is_fetched[]); - /** * Attempt to initiate the transfer of some objects from remote Plasma Stores. + * This method does not guarantee that the fetched objects will arrive locally. * * For an object that is available in the local Plasma Store, this method will * not do anything. For an object that is not available locally, it will check * if the object are already being fetched. If so, it will not do anything. If * not, it will query the object table for a list of Plasma Managers that have - * the object. If that list is non-empty, it will attempt to initiate transfers - * from one of those Plasma Managers. If the list is empty, it will set a - * callback to initiate a transfer when the list becomes non-empty. - * - * TODO(rkn): Setting the callback for when the list becomes non-empty is not - * implemented. + * the object. The object table will return a non-empty list, and this Plasma + * Manager will attempt to initiate transfers from one of those Plasma Managers. * * This function is non-blocking. * @@ -227,9 +205,9 @@ void plasma_fetch(plasma_connection *conn, * @param object_ids The IDs of the objects that fetch is being called on. * @return Void. */ -void plasma_fetch2(plasma_connection *conn, - int num_object_ids, - object_id object_ids[]); +void plasma_fetch(plasma_connection *conn, + int num_object_ids, + object_id object_ids[]); /** * Transfer local object to a different plasma manager. @@ -246,28 +224,6 @@ void plasma_transfer(plasma_connection *conn, int port, object_id object_id); -/** - * Wait for objects to be created (right now, wait for local objects). - * - * @param conn The object containing the connection state. - * @param num_object_ids Number of object IDs wait is called on. - * @param object_ids Object IDs wait is called on. - * @param timeout Wait will time out and return after this number of ms. - * @param num_returns Number of object IDs wait will return if it doesn't time - * out. - * @param return_object_ids Out parameter for the object IDs returned by wait. - * This is an array of size num_returns. If the number of objects that - * are ready when we time out, the objects will be stored in the last - * slots of the array and the number of objects is returned. - * @return Number of objects that are actually ready. - */ -int plasma_wait(plasma_connection *conn, - int num_object_ids, - object_id object_ids[], - uint64_t timeout, - int num_returns, - object_id return_object_ids[]); - /** * Subscribe to notifications when objects are sealed in the object store. * Whenever an object is sealed, a message will be written to the client socket @@ -288,8 +244,6 @@ int plasma_subscribe(plasma_connection *conn); */ int get_manager_fd(plasma_connection *conn); -/* === ALTERNATE PLASMA CLIENT API === */ - /** * Object buffer data structure. */ @@ -319,38 +273,7 @@ bool plasma_get_local(plasma_connection *conn, object_buffer *object_buffer); /** - * Initiates the fetch (transfer) of an object from a remote Plasma Store. - * - * If the object is stored in the local Plasma Store, tell the caller. - * - * If not, check whether the object is stored on a remote Plasma Store. If yes, - * and if a transfer for the object has either been scheduled or is in progress, - * then return. Otherwise schedule a transfer for the object. - * - * If the object is not available locally or remotely, the client has to tell - * local scheduler to (re)create the object. - * - * This function is non-blocking. - * - * @param conn The object containing the connection state. - * @param object_id The ID of the object we want to transfer. - * @return Status as returned by the get_status() function. Status can take the - * following values. - * - PLASMA_CLIENT_LOCAL, if the object is stored in the local Plasma - * Store. - * - PLASMA_CLIENT_TRANSFER, if the object is either currently being - * transferred or the transfer has been scheduled. - * - PLASMA_CLIENT_REMOTE, if the object is stored at a remote Plasma - * Store. - * - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the - * system. - */ -int plasma_fetch_remote(plasma_connection *conn, object_id object_id); - -/** - * Return the status of a given object. This function is similar to - * plasma_fetch_remote() with the only difference that plamsa_fetch_remote() - * also schedules the obejct transfer, if not local. + * Return the status of a given object. This method may query the object table. * * @param conn The object containing the connection state. * @param object_id The ID of the object whose status we query. @@ -393,7 +316,9 @@ int plasma_info(plasma_connection *conn, * "type" field. * - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes * available in the local Plasma Store. In this case, this function - * sets the "status" field to PLASMA_OBJECT_LOCAL. + * sets the "status" field to PLASMA_OBJECT_LOCAL. Note, if the status + * is not PLASMA_OBJECT_LOCAL, it will be PLASMA_OBJECT_NONEXISTENT, + * but it may exist elsewhere in the system. * - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id becomes * available either at the local Plasma Store or on a remote Plasma * Store. In this case, the functions sets the "status" field to @@ -401,51 +326,18 @@ int plasma_info(plasma_connection *conn, * @param num_ready_objects The number of requests in object_requests array that * must be satisfied before the function returns, unless it timeouts. * The num_ready_objects should be no larger than num_object_requests. - * @param timeout_ms Timeout value in milliseconds. If this timeout expires + * @param timeout_ms Timeout value in milliseconds. If this timeout expires * before min_num_ready_objects of requests are satisfied, the function * returns. * @return Number of satisfied requests in the object_requests list. If the * returned number is less than min_num_ready_objects this means that * timeout expired. */ -int plasma_wait_for_objects(plasma_connection *conn, - int num_object_requests, - object_request object_requests[], - int num_ready_objects, - uint64_t timeout_ms); - -/** - * Wait for (1) a specified number of objects to be available (sealed) in the - * local Plasma Store or in a remote Plasma Store, or (2) for a timeout to - * expire. This is a blocking call. - * - * @param conn The object containing the connection state. - * @param num_object_requests Size of the object_requests array. - * @param object_requests Object event array. Each element contains a request - * for a particular object_id. The type of request is specified in the - * "type" field. - * - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes - * available in the local Plasma Store. In this case, this function - * sets the "status" field to PLASMA_OBJECT_LOCAL. - * - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id becomes - * available either at the local Plasma Store or on a remote Plasma - * Store. In this case, the functions sets the "status" field to - * PLASMA_OBJECT_LOCAL or PLASMA_OBJECT_REMOTE. - * @param num_ready_objects The number of requests in object_requests array that - * must be satisfied before the function returns, unless it timeouts. - * The num_ready_objects should be no larger than num_object_requests. - * @param timeout_ms Timeout value in milliseconds. If this timeout expires - * before min_num_ready_objects of requests are satisfied, the function - * returns. - * @return Number of satisfied requests in the object_requests list. If the - * returned number is less than min_num_ready_objects this means that - * timeout expired. - */ -int plasma_wait_for_objects2(plasma_connection *conn, - int num_object_requests, - object_request object_requests[], - int num_ready_objects, - uint64_t timeout_ms); +int plasma_wait(plasma_connection *conn, + int num_object_requests, + object_request object_requests[], + int num_ready_objects, + uint64_t timeout_ms); /** * TODO: maybe move the plasma_client_* functions in another file. @@ -498,8 +390,8 @@ int plasma_client_wait(plasma_connection *conn, * @param num_object_ids The number of objects in the array to be returned. * @param object_ids The array of object IDs to be returned. * @param object_buffers The array of data structure where the information of - * the return objects will be stored. The objects appear - * in the same order as their IDs in the object_ids array, + * the return objects will be stored. The objects appear in the same + * order as their IDs in the object_ids array, * @return Void. */ void plasma_client_multiget(plasma_connection *conn, diff --git a/src/plasma/plasma_extension.c b/src/plasma/plasma_extension.c index e9084ff3f..538bba8cd 100644 --- a/src/plasma/plasma_extension.c +++ b/src/plasma/plasma_extension.c @@ -170,49 +170,7 @@ PyObject *PyPlasma_fetch(PyObject *self, PyObject *args) { for (int i = 0; i < n; ++i) { PyObjectToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); } - /* Check that there are no duplicate object IDs. TODO(rkn): we should allow - * this in the future. */ - if (!plasma_object_ids_distinct(n, object_ids)) { - PyErr_SetString(PyExc_RuntimeError, - "The same object ID is used multiple times in this call to " - "fetch."); - return NULL; - } - int *success_array = malloc(sizeof(int) * n); - memset(success_array, 0, sizeof(int) * n); - plasma_fetch(conn, (int) n, object_ids, success_array); - PyObject *success_list = PyList_New(n); - for (int i = 0; i < n; ++i) { - if (success_array[i]) { - Py_INCREF(Py_True); - PyList_SetItem(success_list, i, Py_True); - } else { - Py_INCREF(Py_False); - PyList_SetItem(success_list, i, Py_False); - } - } - free(object_ids); - free(success_array); - return success_list; -} - -PyObject *PyPlasma_fetch2(PyObject *self, PyObject *args) { - plasma_connection *conn; - PyObject *object_id_list; - if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaConnection, &conn, - &object_id_list)) { - return NULL; - } - if (!plasma_manager_is_connected(conn)) { - PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); - return NULL; - } - Py_ssize_t n = PyList_Size(object_id_list); - object_id *object_ids = malloc(sizeof(object_id) * n); - for (int i = 0; i < n; ++i) { - PyObjectToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); - } - plasma_fetch2(conn, (int) n, object_ids); + plasma_fetch(conn, (int) n, object_ids); free(object_ids); Py_RETURN_NONE; } @@ -250,67 +208,6 @@ PyObject *PyPlasma_wait(PyObject *self, PyObject *args) { return NULL; } - object_id *object_ids = malloc(sizeof(object_id) * n); - for (int i = 0; i < n; ++i) { - PyObjectToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]); - } - object_id *return_ids = malloc(sizeof(object_id) * num_returns); - - /* Drop the global interpreter lock while we are waiting, so other threads can - * run. */ - int num_return_objects; - Py_BEGIN_ALLOW_THREADS; - num_return_objects = plasma_wait(conn, (int) n, object_ids, - (uint64_t) timeout, num_returns, return_ids); - Py_END_ALLOW_THREADS; - - PyObject *ready_ids = PyList_New(num_return_objects); - PyObject *waiting_ids = PySet_New(object_id_list); - for (int i = num_returns - num_return_objects; i < num_returns; ++i) { - PyObject *ready = - PyString_FromStringAndSize((char *) return_ids[i].id, UNIQUE_ID_SIZE); - PyList_SetItem(ready_ids, i - (num_returns - num_return_objects), ready); - PySet_Discard(waiting_ids, ready); - } - PyObject *t = PyTuple_New(2); - PyTuple_SetItem(t, 0, ready_ids); - PyTuple_SetItem(t, 1, waiting_ids); - return t; -} - -PyObject *PyPlasma_wait2(PyObject *self, PyObject *args) { - plasma_connection *conn; - PyObject *object_id_list; - long long timeout; - int num_returns; - if (!PyArg_ParseTuple(args, "O&OLi", PyObjectToPlasmaConnection, &conn, - &object_id_list, &timeout, &num_returns)) { - return NULL; - } - Py_ssize_t n = PyList_Size(object_id_list); - - if (!plasma_manager_is_connected(conn)) { - PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager"); - return NULL; - } - if (num_returns < 0) { - PyErr_SetString(PyExc_RuntimeError, - "The argument num_returns cannot be less than zero."); - return NULL; - } - if (num_returns > n) { - PyErr_SetString( - PyExc_RuntimeError, - "The argument num_returns cannot be greater than len(object_ids)"); - return NULL; - } - int64_t threshold = 1 << 30; - if (timeout > threshold) { - PyErr_SetString(PyExc_RuntimeError, - "The argument timeout cannot be greater than 2 ** 30."); - return NULL; - } - object_request *object_requests = malloc(sizeof(object_request) * n); for (int i = 0; i < n; ++i) { PyObjectToUniqueID(PyList_GetItem(object_id_list, i), @@ -321,8 +218,8 @@ PyObject *PyPlasma_wait2(PyObject *self, PyObject *args) { * run. */ int num_return_objects; Py_BEGIN_ALLOW_THREADS; - num_return_objects = plasma_wait_for_objects2( - conn, (int) n, object_requests, num_returns, (uint64_t) timeout); + num_return_objects = plasma_wait(conn, (int) n, object_requests, num_returns, + (uint64_t) timeout); Py_END_ALLOW_THREADS; int num_to_return = MIN(num_return_objects, num_returns); @@ -444,9 +341,7 @@ static PyMethodDef plasma_methods[] = { "Does the plasma store contain this plasma object?"}, {"fetch", PyPlasma_fetch, METH_VARARGS, "Fetch the object from another plasma manager instance."}, - {"fetch2", PyPlasma_fetch2, METH_VARARGS, - "Fetch the object from another plasma manager instance."}, - {"wait", PyPlasma_wait2, METH_VARARGS, + {"wait", PyPlasma_wait, METH_VARARGS, "Wait until num_returns objects in object_ids are ready."}, {"evict", PyPlasma_evict, METH_VARARGS, "Evict some objects until we recover some number of bytes."}, diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index f3d1e7d9b..53ab3845e 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -36,45 +36,15 @@ #include "state/db.h" #include "state/object_table.h" -void wait_object_lookup_callback(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context); - /** * Process either the fetch or the status request. * * @param client_conn Client connection. * @param object_id ID of the object for which we process this request. - * @param fetc If true we process a fetch request, and if false - * we process a status request. * @return Void. */ -void process_fetch_or_status_request(client_connection *client_conn, - object_id object_id, - bool fetch); -/** - * Request a transfer for the given object ID from the next manager believed to - * have a copy. Adds the request for this object ID to the queue of outgoing - * requests to the manager we want to try. - * - * requst_trasnfer_from() will lead to thic Plasma manager, call it S, - * (1) sending a PLASMA_TRANFER request for object_id to the other - * end-point, R, of the client_conn. R is a remote Plasma manager - * which is expected to store object_id. - * (2) upon receiving this request, R will invoke process_transfer_request() - * which will send a PLASMA_DATA request containing object_id back to - * S. - * (3) Upen receiving the PLASMA_DATA request, S, will invoke - * process_data_request() (via process_data_chunk()) to read object_id. - * Note that all requests that are exchanged between S and R are via FIFO - * queues. - * - * @param client_conn The context for the connection to this client. - * @param object_id The object ID we want to request a transfer of. - * @returns Void. - */ -void request_transfer_from(client_connection *client_conn, object_id object_id); +void process_status_request(client_connection *client_conn, + object_id object_id); /** * Request the transfer from a remote node or get the status of @@ -86,15 +56,12 @@ void request_transfer_from(client_connection *client_conn, object_id object_id); * @param manager_vector Array containing the Plasma Managers * running at the nodes where object_id is stored. * @param context Client connection. - * @param fetch If true, this was triggered by a fetc operation. If not. - * we request its status. * @return Status of object_id as defined in plasma.h */ -int request_fetch_or_status(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context, - bool fetch); +int request_status(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context); /** * Send requested object_id back to the Plasma Manager identified @@ -160,8 +127,28 @@ typedef struct { /** Handle for the uthash table in the manager state that keeps track of * outstanding fetch requests. */ UT_hash_handle hh; -} fetch_request2; +} fetch_request; +/** + * There are fundamentally two data structures used for handling wait requests. + * There is the "wait_request" struct and the "object_wait_requests" struct. A + * wait_request keeps track of all of the object IDs that a wait_request is + * waiting for. An object_wait_requests struct keeps track of all of the + * wait_request structs that are waiting for a particular object iD. The + * plasma_manager_state contains a hash table mapping object IDs to their + * coresponding object_wait_requests structs. + * + * These data structures are updated by several methods: + * - add_wait_request_for_object adds a wait_request to the + * object_wait_requests struct corresponding to a particular object ID. This + * is called when a client calls plasma_wait. + * - remove_wait_request_for_object removes a wait_request from an + * object_wait_requests struct. When a wait request returns, this method is + * called for all of the object IDs involved in that wait_request. + * - update_object_wait_requests removes an object_wait_requests struct and + * does some processing for each wait_request involved in that + * object_wait_requests struct. + */ typedef struct { /** The client connection that called wait. */ client_connection *client_conn; @@ -179,11 +166,11 @@ typedef struct { /** The number of object requests in this wait request that are already * satisfied. */ int64_t num_satisfied; -} wait_request2; +} wait_request; /** This is used to define the utarray of wait requests in the * object_wait_requests struct. */ -UT_icd wait_request2_icd = {sizeof(wait_request2 *), NULL, NULL, NULL}; +UT_icd wait_request_icd = {sizeof(wait_request *), NULL, NULL, NULL}; typedef struct { /** The ID of the object. This is used as a key in a hash table. */ @@ -209,13 +196,9 @@ struct plasma_manager_state { uint8_t addr[4]; /** Our port. */ int port; - /** Hash table of outstanding fetch requests. The key is - * object id, value is a list of connections to the clients - * who are blocking on a fetch of this object. */ - client_object_request *fetch_requests; /** Hash table of outstanding fetch requests. The key is the object ID. The * value is the data needed to perform the fetch. */ - fetch_request2 *fetch_requests2; + fetch_request *fetch_requests; /** A hash table mapping object IDs to a vector of the wait requests that * are waiting for the object to arrive locally. */ object_wait_requests *object_wait_requests_local; @@ -280,12 +263,6 @@ struct client_connection { int fd; /** Timer id for timing out wait (or fetch). */ int64_t timer_id; - /** True if this client is in a "wait" and false if it is in a "fetch". */ - bool is_wait; - /** True if we use new version of wait. */ - bool wait1; - /** True if we use the new version of fetch. */ - bool fetch1; /** If this client is processing a wait, this contains the object ids that * are already available. */ plasma_reply *wait_reply; @@ -303,114 +280,6 @@ struct client_connection { UT_hash_handle manager_hh; }; -void free_client_object_request(client_object_request *object_req) { - for (int i = 0; i < object_req->manager_count; ++i) { - free(object_req->manager_vector[i]); - } - free(object_req->manager_vector); - free(object_req); -} - -void send_client_reply(client_connection *conn, plasma_reply *reply) { - CHECK(conn->num_return_objects >= 0); - --conn->num_return_objects; - CHECK(plasma_send_reply(conn->fd, reply) >= 0); -} - -void send_client_failure_reply(object_id object_id, client_connection *conn) { - plasma_reply reply = plasma_make_reply(object_id); - reply.has_object = 0; - send_client_reply(conn, &reply); -} - -/** - * Get the context for the given object ID for the given client - * connection, if there is one active. - * - * @param client_conn The client connection context. - * @param object_id The object ID whose context we want. - * @return A pointer to the active object context, or NULL if - * there isn't one. - */ -client_object_request *get_object_request(client_connection *client_conn, - object_id object_id) { - client_object_request *object_req; - HASH_FIND(active_hh, client_conn->active_objects, &object_id, - sizeof(object_id), object_req); - return object_req; -} - -client_object_request *add_object_request(client_connection *client_conn, - object_id object_id) { - CHECK(client_conn); - /* Create a new context for this client connection and object. */ - client_object_request *object_req = malloc(sizeof(client_object_request)); - CHECK(object_req); - object_req->object_id = object_id; - object_req->client_conn = client_conn; - /* The timer ID returned by event_loop_add_timer is positive, so we can check - * if the timer is -1 to see if a timer has been added. */ - object_req->timer = -1; - object_req->manager_count = 0; - object_req->manager_vector = NULL; - object_req->next_manager = 0; - /* Register the object context with the client context. */ - client_object_request *temp_object_conn = NULL; - HASH_FIND(active_hh, client_conn->active_objects, &object_id, - sizeof(object_id), temp_object_conn); - CHECKM(temp_object_conn == NULL, - "The hash table already has an object connection for this object ID."); - HASH_ADD(active_hh, client_conn->active_objects, object_id, sizeof(object_id), - object_req); - /* Register the object context with the manager state. */ - client_object_request *fetch_requests; - HASH_FIND(fetch_hh, client_conn->manager_state->fetch_requests, &object_id, - sizeof(object_id), fetch_requests); - LOG_DEBUG("Registering fd %d for fetch.", client_conn->fd); - if (!fetch_requests) { - fetch_requests = NULL; - LL_APPEND(fetch_requests, object_req); - HASH_ADD(fetch_hh, client_conn->manager_state->fetch_requests, object_id, - sizeof(object_id), fetch_requests); - } else { - LL_APPEND(fetch_requests, object_req); - } - return object_req; -} - -void remove_object_request(client_connection *client_conn, - client_object_request *object_req) { - /* Deregister the object context with the client context. */ - /* TODO(rkn): Check that object_conn is actually in the hash table. */ - HASH_DELETE(active_hh, client_conn->active_objects, object_req); - /* Deregister the object context with the manager state. */ - client_object_request *object_reqs; - HASH_FIND(fetch_hh, client_conn->manager_state->fetch_requests, - &(object_req->object_id), sizeof(object_req->object_id), - object_reqs); - CHECK(object_reqs); - int len; - client_object_request *tmp; - LL_COUNT(object_reqs, tmp, len); - if (len == 1) { - HASH_DELETE(fetch_hh, client_conn->manager_state->fetch_requests, - object_reqs); - } - LL_DELETE(object_reqs, object_req); - /* remove_object_request() is not always called from the request's timer - * handle, so we remove the request's timer explicitly here. If - * remove_object_request() is called from the the request's timer handle, the - * code will still work correctly. While the timer handle returning - * EVENT_LOOP_TIMER_DONE will trigger another call for removing the request's - * timer, that's ok as event_loop_remove_timer() is idempotent. */ - if (object_req->timer != -1) { - event_loop_remove_timer(client_conn->manager_state->loop, - object_req->timer); - } - /* Free the object. */ - free_client_object_request(object_req); -} - object_wait_requests **object_wait_requests_table_ptr_from_type( plasma_manager_state *manager_state, int type) { @@ -427,7 +296,7 @@ object_wait_requests **object_wait_requests_table_ptr_from_type( void add_wait_request_for_object(plasma_manager_state *manager_state, object_id object_id, int type, - wait_request2 *wait_req) { + wait_request *wait_req) { object_wait_requests **object_wait_requests_table_ptr = object_wait_requests_table_ptr_from_type(manager_state, type); object_wait_requests *object_wait_reqs; @@ -439,7 +308,7 @@ void add_wait_request_for_object(plasma_manager_state *manager_state, if (object_wait_reqs == NULL) { object_wait_reqs = malloc(sizeof(object_wait_requests)); object_wait_reqs->object_id = object_id; - utarray_new(object_wait_reqs->wait_requests, &wait_request2_icd); + utarray_new(object_wait_reqs->wait_requests, &wait_request_icd); HASH_ADD(hh, *object_wait_requests_table_ptr, object_id, sizeof(object_wait_reqs->object_id), object_wait_reqs); } @@ -451,7 +320,7 @@ void add_wait_request_for_object(plasma_manager_state *manager_state, void remove_wait_request_for_object(plasma_manager_state *manager_state, object_id object_id, int type, - wait_request2 *wait_req) { + wait_request *wait_req) { object_wait_requests **object_wait_requests_table_ptr = object_wait_requests_table_ptr_from_type(manager_state, type); object_wait_requests *object_wait_reqs; @@ -462,8 +331,8 @@ void remove_wait_request_for_object(plasma_manager_state *manager_state, * vector. */ if (object_wait_reqs != NULL) { for (int i = 0; i < utarray_len(object_wait_reqs->wait_requests); ++i) { - wait_request2 **wait_req_ptr = - (wait_request2 **) utarray_eltptr(object_wait_reqs->wait_requests, i); + wait_request **wait_req_ptr = + (wait_request **) utarray_eltptr(object_wait_reqs->wait_requests, i); if (*wait_req_ptr == wait_req) { /* Remove the wait request from the array. */ utarray_erase(object_wait_reqs->wait_requests, i, 1); @@ -475,8 +344,8 @@ void remove_wait_request_for_object(plasma_manager_state *manager_state, } } -void remove_wait_request2(plasma_manager_state *manager_state, - wait_request2 *wait_req) { +void remove_wait_request(plasma_manager_state *manager_state, + wait_request *wait_req) { if (wait_req->timer != -1) { CHECK(event_loop_remove_timer(manager_state->loop, wait_req->timer) == AE_OK); @@ -485,8 +354,8 @@ void remove_wait_request2(plasma_manager_state *manager_state, free(wait_req); } -void return_from_wait2(plasma_manager_state *manager_state, - wait_request2 *wait_req) { +void return_from_wait(plasma_manager_state *manager_state, + wait_request *wait_req) { plasma_reply *reply = plasma_alloc_reply(wait_req->num_object_requests); reply->num_object_ids = wait_req->num_object_requests; for (int i = 0; i < wait_req->num_object_requests; ++i) { @@ -503,7 +372,7 @@ void return_from_wait2(plasma_manager_state *manager_state, wait_req->object_requests[i].type, wait_req); } /* Remove the wait request. */ - remove_wait_request2(manager_state, wait_req); + remove_wait_request(manager_state, wait_req); } void update_object_wait_requests(plasma_manager_state *manager_state, @@ -518,9 +387,9 @@ void update_object_wait_requests(plasma_manager_state *manager_state, object_wait_reqs); if (object_wait_reqs != NULL) { for (int i = 0; i < utarray_len(object_wait_reqs->wait_requests); ++i) { - wait_request2 **wait_req_ptr = - (wait_request2 **) utarray_eltptr(object_wait_reqs->wait_requests, i); - wait_request2 *wait_req = *wait_req_ptr; + wait_request **wait_req_ptr = + (wait_request **) utarray_eltptr(object_wait_reqs->wait_requests, i); + wait_request *wait_req = *wait_req_ptr; wait_req->num_satisfied += 1; /* Mark the object as present in the wait request. */ int j = 0; @@ -537,7 +406,7 @@ void update_object_wait_requests(plasma_manager_state *manager_state, CHECK(j != wait_req->num_object_requests); /* If this wait request is done, reply to the client. */ if (wait_req->num_satisfied == wait_req->num_object_requests) { - return_from_wait2(manager_state, wait_req); + return_from_wait(manager_state, wait_req); } } /* Remove the array of wait requests for this object, since no one should be @@ -548,10 +417,21 @@ void update_object_wait_requests(plasma_manager_state *manager_state, } } +fetch_request *create_fetch_request(plasma_manager_state *manager_state, + object_id object_id) { + fetch_request *fetch_req = malloc(sizeof(fetch_request)); + fetch_req->manager_state = manager_state; + fetch_req->object_id = object_id; + fetch_req->timer = -1; + fetch_req->manager_count = 0; + fetch_req->manager_vector = NULL; + return fetch_req; +} + void remove_fetch_request(plasma_manager_state *manager_state, - fetch_request2 *fetch_req) { + fetch_request *fetch_req) { /* Remove the fetch request from the table of fetch requests. */ - HASH_DELETE(hh, manager_state->fetch_requests2, fetch_req); + HASH_DELETE(hh, manager_state->fetch_requests, fetch_req); /* Remove the timer associated with this fetch request. */ if (fetch_req->timer != -1) { CHECK(event_loop_remove_timer(manager_state->loop, fetch_req->timer) == @@ -578,7 +458,6 @@ plasma_manager_state *init_plasma_manager_state(const char *store_socket_name, plasma_connect(store_socket_name, NULL, PLASMA_DEFAULT_RELEASE_DELAY); state->manager_connections = NULL; state->fetch_requests = NULL; - state->fetch_requests2 = NULL; state->object_wait_requests_local = NULL; state->object_wait_requests_remote = NULL; if (db_addr) { @@ -618,16 +497,8 @@ void destroy_plasma_manager_state(plasma_manager_state *state) { } if (state->fetch_requests != NULL) { - LOG_DEBUG("There were outstanding fetch requests."); - client_object_request *object_req, *tmp; - HASH_ITER(fetch_hh, state->fetch_requests, object_req, tmp) { - remove_object_request(object_req->client_conn, object_req); - } - } - - if (state->fetch_requests2 != NULL) { - fetch_request2 *fetch_req, *tmp; - HASH_ITER(hh, state->fetch_requests2, fetch_req, tmp) { + fetch_request *fetch_req, *tmp; + HASH_ITER(hh, state->fetch_requests, fetch_req, tmp) { remove_fetch_request(fetch_req->manager_state, fetch_req); } } @@ -932,43 +803,10 @@ void process_data_request(event_loop *loop, } } -void request_transfer_from(client_connection *client_conn, +void request_transfer_from(plasma_manager_state *manager_state, object_id object_id) { - client_object_request *object_req = - get_object_request(client_conn, object_id); - CHECK(object_req); - CHECK(object_req->manager_count > 0); - CHECK(object_req->next_manager >= 0 && - object_req->next_manager < object_req->manager_count); - char addr[16]; - int port; - parse_ip_addr_port(object_req->manager_vector[object_req->next_manager], addr, - &port); - - client_connection *manager_conn = - get_manager_connection(client_conn->manager_state, addr, port); - plasma_request_buffer *transfer_request = - malloc(sizeof(plasma_request_buffer)); - transfer_request->type = PLASMA_TRANSFER; - transfer_request->object_id = object_req->object_id; - - if (manager_conn->transfer_queue == NULL) { - /* If we already have a connection to this manager and its inactive, - * (re)register it with the event loop. */ - event_loop_add_file(client_conn->manager_state->loop, manager_conn->fd, - EVENT_LOOP_WRITE, send_queued_request, manager_conn); - } - /* Add this transfer request to this connection's transfer queue. */ - LL_APPEND(manager_conn->transfer_queue, transfer_request); - /* On the next attempt, try the next manager in manager_vector. */ - object_req->next_manager += 1; - object_req->next_manager %= object_req->manager_count; -} - -void request_transfer_from2(plasma_manager_state *manager_state, - object_id object_id) { - fetch_request2 *fetch_req; - HASH_FIND(hh, manager_state->fetch_requests2, &object_id, sizeof(object_id), + fetch_request *fetch_req; + HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id), fetch_req); /* TODO(rkn): This probably can be NULL so we should remove this check, and * instead return in the case where there is no fetch request. */ @@ -1003,25 +841,9 @@ void request_transfer_from2(plasma_manager_state *manager_state, } int manager_timeout_handler(event_loop *loop, timer_id id, void *context) { - client_object_request *object_req = context; - client_connection *client_conn = object_req->client_conn; - LOG_DEBUG("Timer went off, %d tries left", object_req->num_retries); - if (object_req->num_retries > 0) { - request_transfer_from(client_conn, object_req->object_id); - object_req->num_retries--; - return MANAGER_TIMEOUT; - } - plasma_reply reply = plasma_make_reply(object_req->object_id); - reply.has_object = 0; - send_client_reply(client_conn, &reply); - remove_object_request(client_conn, object_req); - return EVENT_LOOP_TIMER_DONE; -} - -int manager_timeout_handler2(event_loop *loop, timer_id id, void *context) { - fetch_request2 *fetch_req = context; + fetch_request *fetch_req = context; plasma_manager_state *manager_state = fetch_req->manager_state; - request_transfer_from2(manager_state, fetch_req->object_id); + request_transfer_from(manager_state, fetch_req->object_id); return MANAGER_TIMEOUT; } @@ -1032,66 +854,16 @@ bool is_object_local(plasma_manager_state *state, object_id object_id) { return entry != NULL; } -/* TODO(swang): Consolidate transfer requests for same object - * from different client IDs by passing in manager state, not - * client context. */ void request_transfer(object_id object_id, int manager_count, const char *manager_vector[], void *context) { - client_connection *client_conn = (client_connection *) context; - client_object_request *object_req = - get_object_request(client_conn, object_id); - /* If there's already an outstanding fetch for this object for this client, - * let the outstanding request finish the work. */ - if (object_req) { - return; - } - /* If the object isn't on any managers, report a failure to the client. */ - LOG_DEBUG("Object is on %d managers", manager_count); - if (manager_count == 0) { - /* TODO(swang): Instead of immediately counting this as a failure, maybe - * register a Redis callback for changes to this object table entry. */ - plasma_reply reply = plasma_make_reply(object_id); - reply.object_status = PLASMA_OBJECT_NONEXISTENT; - CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); - return; - } - /* Register the new outstanding fetch with the current client connection. */ - object_req = add_object_request(client_conn, object_id); - if (!object_req) { - LOG_DEBUG("Unable to allocate memory for object context."); - send_client_failure_reply(object_id, client_conn); - } - /* Pick a different manager to request a transfer from on every attempt. */ - object_req->manager_count = manager_count; - object_req->manager_vector = malloc(manager_count * sizeof(char *)); - memset(object_req->manager_vector, 0, manager_count * sizeof(char *)); - for (int i = 0; i < manager_count; ++i) { - int len = strlen(manager_vector[i]); - object_req->manager_vector[i] = malloc(len + 1); - strncpy(object_req->manager_vector[i], manager_vector[i], len); - object_req->manager_vector[i][len] = '\0'; - } - /* Wait for the object data for the default number of retries, which timeout - * after a default interval. */ - object_req->num_retries = NUM_RETRIES; - object_req->timer = - event_loop_add_timer(client_conn->manager_state->loop, MANAGER_TIMEOUT, - manager_timeout_handler, object_req); - request_transfer_from(client_conn, object_id); -} - -void request_transfer2(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context) { plasma_manager_state *manager_state = (plasma_manager_state *) context; /* This callback is called from object_table_subscribe, which guarantees that * the manager vector contains at least one element. */ CHECK(manager_count >= 1); - fetch_request2 *fetch_req; - HASH_FIND(hh, manager_state->fetch_requests2, &object_id, sizeof(object_id), + fetch_request *fetch_req; + HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id), fetch_req); if (is_object_local(manager_state, object_id)) { @@ -1128,285 +900,38 @@ void request_transfer2(object_id object_id, } /* Wait for the object data for the default number of retries, which timeout * after a default interval. */ - request_transfer_from2(manager_state, object_id); + request_transfer_from(manager_state, object_id); /* It is possible for this method to be called multiple times, but we only * need to create a timer once. */ if (fetch_req->timer == -1) { fetch_req->timer = event_loop_add_timer(manager_state->loop, MANAGER_TIMEOUT, - manager_timeout_handler2, fetch_req); + manager_timeout_handler, fetch_req); } } -void process_fetch_request(client_connection *client_conn, - object_id object_id) { - client_conn->is_wait = false; - client_conn->fetch1 = false; - client_conn->wait_reply = NULL; - plasma_reply reply = plasma_make_reply(object_id); - if (client_conn->manager_state->db == NULL) { - reply.has_object = 0; - send_client_reply(client_conn, &reply); - return; - } - /* Return success immediately if we already have this object. */ - if (is_object_local(client_conn->manager_state, object_id)) { - reply.has_object = 1; - send_client_reply(client_conn, &reply); - return; - } - retry_info retry = { - .num_retries = NUM_RETRIES, - .timeout = MANAGER_TIMEOUT, - .fail_callback = (table_fail_callback) send_client_failure_reply, - }; - /* Request a transfer from a plasma manager that has this object. */ - object_table_lookup(client_conn->manager_state->db, object_id, &retry, - request_transfer, client_conn); -} - -void process_fetch_requests(client_connection *client_conn, - int num_object_ids, - object_request object_requests[]) { - for (int i = 0; i < num_object_ids; ++i) { - ++client_conn->num_return_objects; - process_fetch_request(client_conn, object_requests[i].object_id); - } +/* This method is only called from the tests. */ +void call_request_transfer(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { + plasma_manager_state *manager_state = (plasma_manager_state *) context; + fetch_request *fetch_req; + /* Check that there isn't already a fetch request for this object. */ + HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id), + fetch_req); + CHECK(fetch_req == NULL); + /* Create a fetch request. */ + fetch_req = create_fetch_request(manager_state, object_id); + HASH_ADD(hh, manager_state->fetch_requests, object_id, + sizeof(fetch_req->object_id), fetch_req); + request_transfer(object_id, manager_count, manager_vector, context); } void fatal_table_callback(object_id id, void *user_context, void *user_data) { CHECK(0); } -void process_fetch_requests2(client_connection *client_conn, - int num_object_ids, - object_request object_requests[]) { - plasma_manager_state *manager_state = client_conn->manager_state; - for (int i = 0; i < num_object_ids; ++i) { - object_id obj_id = object_requests[i].object_id; - - /* Check if this object is already present locally. If so, do nothing. */ - if (is_object_local(manager_state, obj_id)) { - continue; - } - - /* Check if this object is already being fetched. If so, do nothing. */ - fetch_request2 *entry; - HASH_FIND(hh, manager_state->fetch_requests2, &obj_id, sizeof(obj_id), - entry); - if (entry != NULL) { - continue; - } - - /* Add an entry to the fetch requests data structure to indidate that the - * object is being fetched. */ - entry = malloc(sizeof(fetch_request2)); - entry->manager_state = manager_state; - entry->object_id = obj_id; - entry->timer = -1; - entry->manager_count = 0; - entry->manager_vector = NULL; - HASH_ADD(hh, manager_state->fetch_requests2, object_id, - sizeof(entry->object_id), entry); - - /* Get a list of Plasma Managers that have this object from the object - * table. If the list of Plasma Managers is non-empty, the callback should - * initiate a transfer. */ - /* TODO(rkn): Make sure this also handles the case where the list is - * initially empty. */ - retry_info retry; - memset(&retry, 0, sizeof(retry)); - retry.num_retries = 0; - retry.timeout = MANAGER_TIMEOUT; - retry.fail_callback = fatal_table_callback; - object_table_subscribe(manager_state->db, obj_id, request_transfer2, - manager_state, &retry, NULL, NULL); - } -} - -void return_from_wait(client_connection *client_conn) { - CHECK(client_conn->is_wait); - /* TODO: check for wait1. */ - client_conn->wait_reply->num_objects_returned = - client_conn->wait_reply->num_object_ids - client_conn->num_return_objects; - CHECK(plasma_send_reply(client_conn->fd, client_conn->wait_reply) >= 0); - plasma_free_reply(client_conn->wait_reply); - /* Clean the remaining object connections. */ - client_object_request *object_req, *tmp; - HASH_ITER(active_hh, client_conn->active_objects, object_req, tmp) { - remove_object_request(client_conn, object_req); - } -} - -int wait_timeout_handler(event_loop *loop, timer_id id, void *context) { - client_connection *client_conn = context; - CHECK(client_conn->timer_id == id); - return_from_wait(client_conn); - return EVENT_LOOP_TIMER_DONE; -} - -void process_wait_request(client_connection *client_conn, - int num_object_ids, - object_request object_requests[], - uint64_t timeout, - int num_ready_objects) { - plasma_manager_state *manager_state = client_conn->manager_state; - client_conn->num_return_objects = num_ready_objects; - client_conn->is_wait = true; - client_conn->wait1 = false; /* old wait */ - client_conn->fetch1 = false; - client_conn->timer_id = event_loop_add_timer( - manager_state->loop, timeout, wait_timeout_handler, client_conn); - client_conn->wait_reply = plasma_alloc_reply(num_ready_objects); - for (int i = 0; i < num_object_ids; ++i) { - available_object *entry; - HASH_FIND(hh, manager_state->local_available_objects, - &(object_requests[i].object_id), - sizeof(object_requests[i].object_id), entry); - if (entry) { - /* If an object id occurs twice in object_ids, this will count them twice. - * This might not be desirable behavior. */ - client_conn->num_return_objects -= 1; - client_conn->wait_reply->object_requests[client_conn->num_return_objects] - .object_id = entry->object_id; - if (client_conn->num_return_objects == 0) { - event_loop_remove_timer(manager_state->loop, client_conn->timer_id); - return_from_wait(client_conn); - return; - } - } else { - add_object_request(client_conn, object_requests[i].object_id); - } - } -} - -/** === START - ALTERNATE PLASMA CLIENT API === */ - -void return_from_wait1(client_connection *client_conn) { - CHECK(client_conn->is_wait); - CHECK(client_conn->wait1); - - CHECK(plasma_send_reply(client_conn->fd, client_conn->wait_reply) >= 0); - free(client_conn->wait_reply); - - /* Clean the remaining object connections. TODO(istoica): Check with Philipp. - */ - client_object_request *object_req, *tmp; - HASH_ITER(active_hh, client_conn->active_objects, object_req, tmp) { - remove_object_request(client_conn, object_req); - } -} - -int wait_timeout_handler1(event_loop *loop, timer_id id, void *context) { - client_connection *client_conn = context; - CHECK(client_conn->timer_id == id); - return_from_wait1(client_conn); - return EVENT_LOOP_TIMER_DONE; -} - -int wait_timeout_handler2(event_loop *loop, timer_id id, void *context) { - wait_request2 *wait_req = context; - return_from_wait2(wait_req->client_conn->manager_state, wait_req); - return EVENT_LOOP_TIMER_DONE; -} - -void process_wait_request1(client_connection *client_conn, - int num_object_requests, - object_request object_requests[], - uint64_t timeout, - int num_ready_objects) { - CHECK(client_conn != NULL); - - plasma_manager_state *manager_state = client_conn->manager_state; - client_conn->num_return_objects = num_ready_objects; - - /* We can only run a command at a time on any given client connection - * (client_conn) so set up is_wait so callback() can check whether we are - * still in wait(). */ - client_conn->is_wait = true; - client_conn->wait1 = true; /* new wait request */ - client_conn->fetch1 = false; - - client_conn->wait_reply = plasma_alloc_reply(num_object_requests); - object_requests_copy(num_object_requests, - client_conn->wait_reply->object_requests, - object_requests); - object_requests_set_status_all(num_object_requests, - client_conn->wait_reply->object_requests, - PLASMA_OBJECT_NONEXISTENT); - /* We will just return back the same object_requests list after setting the - * status of the requests. */ - client_conn->wait_reply->num_object_ids = num_object_requests; - - /* Add timer callback. If timeout expires, it invokes wait_timeout_handler(). - * If we get num_ready_objects before timeout expires, we remove the timer. */ - client_conn->timer_id = event_loop_add_timer( - manager_state->loop, timeout, wait_timeout_handler1, client_conn); - - /* Now check whether objects are in the Local Object store, and if not, check - * whether they are remote. */ - for (int i = 0; i < num_object_requests; ++i) { - if (is_object_local(manager_state, object_requests[i].object_id)) { - /* If an object ID occurs twice in object_requests, this will count them - * twice. This might not be desirable behavior. */ - client_conn->num_return_objects -= 1; - client_conn->wait_reply->object_requests[i].status = PLASMA_OBJECT_LOCAL; - if (client_conn->num_return_objects == 0) { - /* We got num_return_objects in the local Object Store, so return. */ - event_loop_remove_timer(manager_state->loop, client_conn->timer_id); - return_from_wait1(client_conn); - return; - } - } else { - object_request *object_request = - &client_conn->wait_reply->object_requests[i]; - - if (object_request->status == PLASMA_OBJECT_NONEXISTENT) { - if (get_object_request(client_conn, object_request->object_id)) { - /* This object is in transfer, which means that it is stored on a - * remote node. */ - client_conn->wait_reply->object_requests[i].status = - PLASMA_OBJECT_REMOTE; - if (client_conn->wait_reply->object_requests[i].type == - PLASMA_QUERY_ANYWHERE) { - client_conn->num_return_objects -= 1; - if (client_conn->num_return_objects == 0) { - /* We got num_return_objects in the local Object Store, so return. - */ - event_loop_remove_timer(manager_state->loop, - client_conn->timer_id); - return_from_wait1(client_conn); - return; - } - } - } - /* Subscribe to hear when object becomes available. */ - retry_info retry_subscribe = { - .num_retries = 0, .timeout = 0, .fail_callback = NULL, - }; - /* TODO(istoica): We should really cache the results here. */ - object_table_subscribe( - g_manager_state->db, - client_conn->wait_reply->object_requests[i].object_id, - wait_object_available_callback, (void *) client_conn, - &retry_subscribe, NULL, NULL); - /* TODO(istoica): Since the existing subscribe doesn't return when the - * object already exists in the Object Table, do a lookup as well. */ - retry_info retry_lookup = { - .num_retries = NUM_RETRIES, - .timeout = MANAGER_TIMEOUT, - .fail_callback = NULL, - }; - - object_table_lookup( - client_conn->manager_state->db, - client_conn->wait_reply->object_requests[i].object_id, - &retry_lookup, wait_object_lookup_callback, client_conn); - } - } - } -} - void object_present_callback(object_id object_id, int manager_count, const char *manager_vector[], @@ -1421,17 +946,83 @@ void object_present_callback(object_id object_id, PLASMA_OBJECT_REMOTE); } -void process_wait_request2(client_connection *client_conn, - int num_object_requests, - object_request object_requests[], - uint64_t timeout_ms, - int num_ready_objects) { +/* This callback is used by both fetch and wait. Therefore, it may have to + * handle outstanding fetch and wait requests. */ +void object_table_subscribe_callback(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { + plasma_manager_state *manager_state = (plasma_manager_state *) context; + /* Run the callback for fetch requests if there is a fetch request. */ + fetch_request *fetch_req; + HASH_FIND(hh, manager_state->fetch_requests, &object_id, sizeof(object_id), + fetch_req); + if (fetch_req != NULL) { + request_transfer(object_id, manager_count, manager_vector, context); + } + /* Run the callback for wait requests. */ + object_present_callback(object_id, manager_count, manager_vector, context); +} + +void process_fetch_requests(client_connection *client_conn, + int num_object_ids, + object_request object_requests[]) { + plasma_manager_state *manager_state = client_conn->manager_state; + for (int i = 0; i < num_object_ids; ++i) { + object_id obj_id = object_requests[i].object_id; + + /* Check if this object is already present locally. If so, do nothing. */ + if (is_object_local(manager_state, obj_id)) { + continue; + } + + /* Check if this object is already being fetched. If so, do nothing. */ + fetch_request *entry; + HASH_FIND(hh, manager_state->fetch_requests, &obj_id, sizeof(obj_id), + entry); + if (entry != NULL) { + continue; + } + + /* Add an entry to the fetch requests data structure to indidate that the + * object is being fetched. */ + entry = create_fetch_request(manager_state, obj_id); + HASH_ADD(hh, manager_state->fetch_requests, object_id, + sizeof(entry->object_id), entry); + + /* Get a list of Plasma Managers that have this object from the object + * table. If the list of Plasma Managers is non-empty, the callback should + * initiate a transfer. */ + /* TODO(rkn): Make sure this also handles the case where the list is + * initially empty. */ + retry_info retry; + memset(&retry, 0, sizeof(retry)); + retry.num_retries = 0; + retry.timeout = MANAGER_TIMEOUT; + retry.fail_callback = fatal_table_callback; + object_table_subscribe(manager_state->db, obj_id, + object_table_subscribe_callback, manager_state, + &retry, NULL, NULL); + } +} + +int wait_timeout_handler(event_loop *loop, timer_id id, void *context) { + wait_request *wait_req = context; + return_from_wait(wait_req->client_conn->manager_state, wait_req); + return EVENT_LOOP_TIMER_DONE; +} + +void process_wait_request(client_connection *client_conn, + int num_object_requests, + object_request object_requests[], + uint64_t timeout_ms, + int num_ready_objects) { CHECK(client_conn != NULL); plasma_manager_state *manager_state = client_conn->manager_state; /* Create a wait request for this object. */ - wait_request2 *wait_req = malloc(sizeof(wait_request2)); - memset(wait_req, 0, sizeof(wait_request2)); + wait_request *wait_req = malloc(sizeof(wait_request)); + memset(wait_req, 0, sizeof(wait_request)); wait_req->client_conn = client_conn; wait_req->timer = -1; wait_req->num_object_requests = num_object_requests; @@ -1478,8 +1069,9 @@ void process_wait_request2(client_connection *client_conn, * timer). */ retry.timeout = 100000; retry.fail_callback = fatal_table_callback; - object_table_subscribe(manager_state->db, obj_id, object_present_callback, - manager_state, &retry, NULL, NULL); + object_table_subscribe(manager_state->db, obj_id, + object_table_subscribe_callback, manager_state, + &retry, NULL, NULL); } else { /* This code should be unreachable. */ CHECK(0); @@ -1489,133 +1081,13 @@ void process_wait_request2(client_connection *client_conn, /* If enough of the wait requests have already been satisfied, return to the * client. */ if (wait_req->num_satisfied >= wait_req->num_objects_to_wait_for) { - return_from_wait2(manager_state, wait_req); + return_from_wait(manager_state, wait_req); return; } /* Set a timer that will cause the wait request to return to the client. */ wait_req->timer = event_loop_add_timer(manager_state->loop, timeout_ms, - wait_timeout_handler2, wait_req); -} - -/* TODO(pcm): unify with wait_object_available_callback. */ -void wait_object_lookup_callback(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context) { - if (manager_count > 0) { - wait_object_available_callback(object_id, manager_count, manager_vector, - context); - } -} - -void wait_object_available_callback(object_id object_id, - int manager_count, - const char *manager_vector[], - void *user_context) { - client_connection *client_conn = (client_connection *) user_context; - CHECK(client_conn != NULL); - plasma_manager_state *manager_state = client_conn->manager_state; - CHECK(manager_state); - - if ((!client_conn->is_wait) || (!client_conn->wait1)) { - return; - } - - plasma_reply *wait_reply = client_conn->wait_reply; - object_request *object_request; - object_request = object_requests_get_object( - object_id, wait_reply->num_object_ids, wait_reply->object_requests); - if (object_request == NULL) { - /* Maybe this is from a previous wait call, so ignore it. */ - return; - } - - /* Check first whether object is avilable in the local Plasma Store. */ - if (is_object_local(manager_state, object_id)) { - client_conn->num_return_objects -= 1; - object_request->status = PLASMA_OBJECT_LOCAL; - } else { - object_request->status = PLASMA_OBJECT_REMOTE; - if (object_request->type == PLASMA_QUERY_ANYWHERE) { - client_conn->num_return_objects -= 1; - } - } - - if (client_conn->num_return_objects == 0) { - /* We got num_return_objects in the local Object Store, so return. */ - event_loop_remove_timer(manager_state->loop, client_conn->timer_id); - return_from_wait1(client_conn); - } -} - -void wait_process_object_available_local(client_connection *client_conn, - object_id object_id) { - CHECK(client_conn != NULL); - if (!client_conn->is_wait) { - return; - } - - plasma_reply *wait_reply = client_conn->wait_reply; - object_request *object_request; - object_request = object_requests_get_object( - object_id, wait_reply->num_object_ids, wait_reply->object_requests); - if (object_request) { - client_conn->num_return_objects -= 1; - object_request->status = PLASMA_OBJECT_LOCAL; - } -} - -/** - * Handler handling the timeout experiation of a transfer request. - * - * @param loop Event loop. - * @param timer_id ID of the timer which has expired. - * @param contect Client connection. - * @return Void. - */ -int fetch_timeout_handler(event_loop *loop, timer_id id, void *context) { - CHECK(loop); - CHECK(context); - - client_object_request *object_req = context; - client_connection *client_conn = object_req->client_conn; - - LOG_DEBUG("Timer went off, %d tries left", object_req->num_retries); - - if (object_req->num_retries > 0) { - request_transfer_from(client_conn, object_req->object_id); - object_req->num_retries--; - return MANAGER_TIMEOUT; - } - plasma_reply reply = plasma_make_reply(object_req->object_id); - reply.object_status = PLASMA_OBJECT_NONEXISTENT; - CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); - - remove_object_request(client_conn, object_req); - return EVENT_LOOP_TIMER_DONE; -} - -/** - * Request the transfer from a remote node. - * - * @param object_id ID of the object to transfer. - * @param manager_cont Number of remote nodes object_id is stored at. - * @param manager_vector Array containing the Plasma Managers running at the - * nodes where object_id is stored. - * @param context Client connection. - * @return Void. - */ -void request_fetch_initiate(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context) { - client_connection *client_conn = (client_connection *) context; - int status = request_fetch_or_status(object_id, manager_count, manager_vector, - context, true); - plasma_reply reply = plasma_make_reply(object_id); - reply.object_status = status; - CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); + wait_timeout_handler, wait_req); } /** @@ -1638,71 +1110,24 @@ void request_status_done(object_id object_id, const char *manager_vector[], void *context) { client_connection *client_conn = (client_connection *) context; - int status = request_fetch_or_status(object_id, manager_count, manager_vector, - context, false); + int status = + request_status(object_id, manager_count, manager_vector, context); plasma_reply reply = plasma_make_reply(object_id); reply.object_status = status; CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); } -int request_fetch_or_status(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context, - bool fetch) { +int request_status(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context) { client_connection *client_conn = (client_connection *) context; - client_object_request *object_req = - get_object_request(client_conn, object_id); /* Return success immediately if we already have this object. */ if (is_object_local(client_conn->manager_state, object_id)) { return PLASMA_OBJECT_LOCAL; } - /* Check wether there's already an outstanding fetch for this object for this - * client, and if yes let the outstanding request finish the work. Note that - * we have already checked for this in process_fetch_or_status_request(), but - * we need to check again here as the object could hav been evicted since - * then. */ - if (object_req) { - return PLASMA_OBJECT_IN_TRANSFER; - } - - /* If the object isn't on any managers, report a failure to the client. */ - LOG_DEBUG("Object is on %d managers", manager_count); - if (manager_count == 0) { - if (object_req) { - remove_object_request(client_conn, object_req); - } - return PLASMA_OBJECT_NONEXISTENT; - } - - if (fetch) { - /* Register the new outstanding fetch with the current client connection. */ - object_req = add_object_request(client_conn, object_id); - CHECKM(object_req != NULL, "Unable to allocate memory for object context."); - - /* Pick a different manager to request a transfer from on every attempt. */ - object_req->manager_count = manager_count; - object_req->manager_vector = malloc(manager_count * sizeof(char *)); - memset(object_req->manager_vector, 0, manager_count * sizeof(char *)); - for (int i = 0; i < manager_count; ++i) { - int len = strlen(manager_vector[i]); - object_req->manager_vector[i] = malloc(len + 1); - strncpy(object_req->manager_vector[i], manager_vector[i], len); - object_req->manager_vector[i][len] = '\0'; - } - /* Wait for the object data for the default number of retries, which timeout - * after a default interval. */ - object_req->num_retries = NUM_RETRIES; - object_req->object_id = object_id; - object_req->timer = - event_loop_add_timer(client_conn->manager_state->loop, MANAGER_TIMEOUT, - fetch_timeout_handler, object_req); - request_transfer_from(client_conn, object_id); - /* Let scheduling the fetch request proceded and return. */ - }; - /* Since object is not stored at the local locally, manager_count > 0 means * that the object is stored at another remote object. Otherwise, if * manager_count == 0, the object is not stored anywhere. */ @@ -1717,11 +1142,8 @@ void object_table_lookup_fail_callback(object_id object_id, CHECK(0); } -void process_fetch_or_status_request(client_connection *client_conn, - object_id object_id, - bool fetch) { - client_conn->is_wait = false; - client_conn->fetch1 = true; +void process_status_request(client_connection *client_conn, + object_id object_id) { client_conn->wait_reply = NULL; /* Return success immediately if we already have this object. */ @@ -1732,14 +1154,6 @@ void process_fetch_or_status_request(client_connection *client_conn, return; } - /* Check whether a transfer request for this object is already pending. */ - if (get_object_request(client_conn, object_id)) { - plasma_reply reply = plasma_make_reply(object_id); - reply.object_status = PLASMA_OBJECT_IN_TRANSFER; - CHECK(plasma_send_reply(client_conn->fd, &reply) >= 0); - return; - } - if (client_conn->manager_state->db == NULL) { plasma_reply reply = plasma_make_reply(object_id); reply.object_status = PLASMA_OBJECT_NONEXISTENT; @@ -1754,18 +1168,10 @@ void process_fetch_or_status_request(client_connection *client_conn, .fail_callback = object_table_lookup_fail_callback, }; - if (fetch) { - /* Request a transfer from a plasma manager that has this object, if any. */ - object_table_lookup(client_conn->manager_state->db, object_id, &retry, - request_fetch_initiate, client_conn); - } else { - object_table_lookup(client_conn->manager_state->db, object_id, &retry, - request_status_done, client_conn); - } + object_table_lookup(client_conn->manager_state->db, object_id, &retry, + request_status_done, client_conn); } -/* === END - ALTERNATE PLASMA CLIENT API === */ - void process_object_notification(event_loop *loop, int client_sock, void *context, @@ -1808,8 +1214,8 @@ void process_object_notification(event_loop *loop, } /* If we were trying to fetch this object, finish up the fetch request. */ - fetch_request2 *fetch_req; - HASH_FIND(hh, state->fetch_requests2, &obj_id, sizeof(obj_id), fetch_req); + fetch_request *fetch_req; + HASH_FIND(hh, state->fetch_requests, &obj_id, sizeof(obj_id), fetch_req); if (fetch_req != NULL) { remove_fetch_request(state, fetch_req); /* TODO(rkn): We also really should unsubscribe from the object table. */ @@ -1820,46 +1226,6 @@ void process_object_notification(event_loop *loop, PLASMA_OBJECT_LOCAL); update_object_wait_requests(state, obj_id, PLASMA_QUERY_ANYWHERE, PLASMA_OBJECT_LOCAL); - - /* Notify any clients who were waiting on a fetch to this object and tick - * off objects we are waiting for. */ - client_object_request *object_req, *next; - client_connection *client_conn; - HASH_FIND(fetch_hh, state->fetch_requests, &obj_id, sizeof(object_id), - object_req); - plasma_reply reply = plasma_make_reply(obj_id); - reply.has_object = 1; - while (object_req) { - next = object_req->next; - client_conn = object_req->client_conn; - if (!client_conn->is_wait) { - event_loop_remove_timer(state->loop, object_req->timer); - if (!client_conn->fetch1) { - send_client_reply(client_conn, &reply); - } - } else { - if (client_conn->wait1) { - wait_process_object_available_local(client_conn, obj_id); - } else { - client_conn->num_return_objects -= 1; - client_conn->wait_reply - ->object_requests[client_conn->num_return_objects] - .object_id = obj_id; - } - if (client_conn->num_return_objects == 0) { - event_loop_remove_timer(loop, client_conn->timer_id); - if (client_conn->wait1) { - return_from_wait1(client_conn); - } else { - return_from_wait(client_conn); - } - object_req = next; - continue; - } - } - remove_object_request(client_conn, object_req); - object_req = next; - } } void process_message(event_loop *loop, @@ -1885,39 +1251,18 @@ void process_message(event_loop *loop, req->data_size, req->metadata_size, conn); break; case PLASMA_FETCH: - LOG_DEBUG("Processing fetch"); + LOG_DEBUG("Processing fetch remote"); process_fetch_requests(conn, req->num_object_ids, req->object_requests); break; - case PLASMA_FETCH_REMOTE: - LOG_DEBUG("Processing fetch remote"); - DCHECK(req->num_object_ids == 1); - process_fetch_or_status_request(conn, req->object_requests[0].object_id, - true); - break; - case PLASMA_FETCH2: - LOG_DEBUG("Processing fetch remote"); - process_fetch_requests2(conn, req->num_object_ids, req->object_requests); - break; case PLASMA_WAIT: LOG_DEBUG("Processing wait"); process_wait_request(conn, req->num_object_ids, req->object_requests, req->timeout, req->num_ready_objects); break; - case PLASMA_WAIT1: - LOG_DEBUG("Processing wait1"); - process_wait_request1(conn, req->num_object_ids, req->object_requests, - req->timeout, req->num_ready_objects); - break; - case PLASMA_WAIT2: - LOG_DEBUG("Processing wait2"); - process_wait_request2(conn, req->num_object_ids, req->object_requests, - req->timeout, req->num_ready_objects); - break; case PLASMA_STATUS: LOG_DEBUG("Processing status"); DCHECK(req->num_object_ids == 1); - process_fetch_or_status_request(conn, req->object_requests[0].object_id, - false); + process_status_request(conn, req->object_requests[0].object_id); break; case DISCONNECT_CLIENT: { LOG_INFO("Disconnecting client on fd %d", client_sock); diff --git a/src/plasma/plasma_manager.h b/src/plasma/plasma_manager.h index a8b63c5ef..38979c148 100644 --- a/src/plasma/plasma_manager.h +++ b/src/plasma/plasma_manager.h @@ -111,58 +111,6 @@ void process_data_chunk(event_loop *loop, void *context, int events); -/** - * Process a fetch request. The fetch request tries: - * 1) If there is no connection to the database, return faliure to the client. - * 2) If the object is available locally, return success to the client. - * 3) Query the database for plasma managers that the object might be on. - * 4) Request a transfer from each of the managers that the object might be on - * until we receive the data, or until we timeout. - * 5) Returns success or failure to the client depending on whether we received - * the data or not. - * - * @param client_conn The connection context for the client that made the - * request. - * @param object_id The object ID requested. - * @return Void. - */ -void process_fetch_request(client_connection *client_conn, object_id object_id); - -/** - * Process a fetch request for multiple objects. The success of each object - * will be written back individually to the socket connected to the client that - * made the request in a plasma_reply. See documentation for - * process_fetch_request for the sequence of operations per object. - * - * @param client_conn The connection context for the client that made the - * request. - * @param num_object_ids The number of object IDs requested. - * @param object_requests[] The object requests fetch is called on. - * @return Void. - */ -void process_fetch_requests(client_connection *client_conn, - int num_object_ids, - object_request object_requests[]); - -/** - * Process a wait request from a client. - * - * @param client_conn The connection context for the client that made the - * request. - * @param num_object_ids Number of object IDs wait is called on. - * @param object_requests The object requests wait is called on. - * @param timeout Wait will time out and return after this number of - * milliseconds. - * @param num_returns Number of object IDs wait will return if it doesn't time - * out. - * @return Void. - */ -void process_wait_request(client_connection *client_conn, - int num_object_ids, - object_request object_requests[], - uint64_t timeout, - int num_returns); - /** * Callback that will be called when a new object becomes available. * @@ -232,36 +180,21 @@ struct plasma_request_buffer { }; /** - * Create a new context for the given object ID with the given - * client connection and register it with the manager's - * outstanding fetch or wait requests and the client - * connection's active object contexts. + * Call the request_transfer method, which well attempt to get an object from + * a remote Plasma manager. If it is unable to get it from another Plasma + * manager, it will cycle through a list of Plasma managers that have the + * object. * - * @param client_conn The client connection context. - * @param object_id The object ID whose context we want to - * create. - * @return A pointer to the newly created object context. + * @param object_id The object ID of the object to transfer. + * @param manager_count The number of managers that have the object. + * @param manager_vector The Plasma managers that have the object. + * @param context The plasma manager state. + * @return Void. */ -client_object_request *add_object_request(client_connection *client_conn, - object_id object_id); - -/** - * Given an object ID and the managers it can be found on, start requesting a - * transfer from the managers. - * - * @param object_id The object ID we want to request a transfer of. - * @param manager_count The number of managers the object can be found on. - * @param manager_vector A vector of the IP addresses of the managers that the - * object can be found on. - * @param context The context for the connection to this client. - * - * Initializes a new context for this client and object. Managers are tried in - * order until we receive the data or we timeout and run out of retries. - */ -void request_transfer(object_id object_id, - int manager_count, - const char *manager_vector[], - void *context); +void call_request_transfer(object_id object_id, + int manager_count, + const char *manager_vector[], + void *context); /** * Clean up and free an active object context. Deregister it from the @@ -326,49 +259,4 @@ event_loop *get_event_loop(plasma_manager_state *state); */ int get_client_sock(client_connection *conn); -/** - * Process a wait request from a client. - * - * @param client_conn The connection context for the client that made the - * request. - * @param num_object_requests Number of object requests wait is called on. - * @param object_requests The array of bject requests wait is called on. - * @param timeout Wait will time out and return after this number of - * milliseconds. - * @param num_returns Number of object requests that will be satsified before - * wait will retunr, unless it timeouts. - * @return Void. - */ -void process_wait_request1(client_connection *client_conn, - int num_object_requests, - object_request object_requests[], - uint64_t timeout, - int num_ready_objects); - -/** - * Callback to be invoked when object_id entry is changed in the - * Object Table. We assume that the change means the object is available. - * - * @param object_id ID of the object becoming available locally or remotely. - * @param user_context This is the client connection on which the wait has been - * called. - * @return Void. - */ -void wait_object_available_callback(object_id object_id, - int manager_count, - const char *manager_vector[], - void *user_context); - -/** - * Object is available (sealed) in the local Object Store. This is part of - * executing wait operation. - * - * @param client_conn The client conection. - * @param user_context This is the client connection on which the wait has been - * called. - * @return Void. - */ -void wait_process_object_available_local(client_connection *client_conn, - object_id object_id); - #endif /* PLASMA_MANAGER_H */ diff --git a/src/plasma/test/client_tests.c b/src/plasma/test/client_tests.c index 2199a8c24..d0f5d869f 100644 --- a/src/plasma/test/client_tests.c +++ b/src/plasma/test/client_tests.c @@ -44,7 +44,7 @@ TEST plasma_status_tests(void) { PASS(); } -TEST plasma_fetch_remote_tests(void) { +TEST plasma_fetch_tests(void) { plasma_connection *plasma_conn1 = plasma_connect( "/tmp/store1", "/tmp/manager1", PLASMA_DEFAULT_RELEASE_DELAY); plasma_connection *plasma_conn2 = plasma_connect( @@ -55,7 +55,7 @@ TEST plasma_fetch_remote_tests(void) { int status; /* No object in the system */ - status = plasma_fetch_remote(plasma_conn1, oid1); + status = plasma_status(plasma_conn1, oid1); ASSERT(status == PLASMA_OBJECT_NONEXISTENT); /* Test for the object being in local Plasma store. */ @@ -70,23 +70,26 @@ TEST plasma_fetch_remote_tests(void) { /* Object with ID oid1 has been just inserted. On the next fetch we might * either find the object or not, depending on whether the Plasma Manager has * received the notification from the Plasma Store or not. */ - status = plasma_fetch_remote(plasma_conn1, oid1); + object_id oid_array1[1] = {oid1}; + plasma_fetch(plasma_conn1, 1, oid_array1); + status = plasma_status(plasma_conn1, oid1); ASSERT((status == PLASMA_OBJECT_LOCAL) || (status == PLASMA_OBJECT_NONEXISTENT)); /* Sleep to make sure Plasma Manager got the notification. */ sleep(1); - status = plasma_fetch_remote(plasma_conn1, oid1); + status = plasma_status(plasma_conn1, oid1); ASSERT(status == PLASMA_OBJECT_LOCAL); /* Test for object being remote. */ - status = plasma_fetch_remote(plasma_conn2, oid1); + status = plasma_status(plasma_conn2, oid1); ASSERT(status == PLASMA_OBJECT_REMOTE); /* Sleep to make sure the object has been fetched and it is now stored in the * local Plasma Store. */ + plasma_fetch(plasma_conn2, 1, oid_array1); sleep(1); - status = plasma_fetch_remote(plasma_conn2, oid1); + status = plasma_status(plasma_conn2, oid1); ASSERT(status == PLASMA_OBJECT_LOCAL); sleep(1); @@ -160,8 +163,8 @@ TEST plasma_wait_for_objects_tests(void) { struct timeval start, end; gettimeofday(&start, NULL); - int n = plasma_wait_for_objects2(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, - NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + int n = plasma_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, + NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); ASSERT(n == 0); gettimeofday(&end, NULL); float diff_ms = (end.tv_sec - start.tv_sec); @@ -177,30 +180,30 @@ TEST plasma_wait_for_objects_tests(void) { plasma_create(plasma_conn1, oid1, data_size, metadata, metadata_size, &data); plasma_seal(plasma_conn1, oid1); - n = plasma_wait_for_objects2(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, - NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + n = plasma_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST, + WAIT_TIMEOUT_MS); ASSERT(n == 1); /* Create and insert an object in plasma_conn2. */ plasma_create(plasma_conn2, oid2, data_size, metadata, metadata_size, &data); plasma_seal(plasma_conn2, oid2); - n = plasma_wait_for_objects2(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, - NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + n = plasma_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST, + WAIT_TIMEOUT_MS); ASSERT(n == 2); - n = plasma_wait_for_objects2(plasma_conn2, NUM_OBJ_REQUEST, obj_requests, - NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + n = plasma_wait(plasma_conn2, NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST, + WAIT_TIMEOUT_MS); ASSERT(n == 2); obj_requests[0].type = PLASMA_QUERY_LOCAL; obj_requests[1].type = PLASMA_QUERY_LOCAL; - n = plasma_wait_for_objects2(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, - NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + n = plasma_wait(plasma_conn1, NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST, + WAIT_TIMEOUT_MS); ASSERT(n == 1); - n = plasma_wait_for_objects2(plasma_conn2, NUM_OBJ_REQUEST, obj_requests, - NUM_OBJ_REQUEST, WAIT_TIMEOUT_MS); + n = plasma_wait(plasma_conn2, NUM_OBJ_REQUEST, obj_requests, NUM_OBJ_REQUEST, + WAIT_TIMEOUT_MS); ASSERT(n == 1); plasma_disconnect(plasma_conn1); @@ -360,7 +363,7 @@ TEST plasma_multiget_tests(void) { SUITE(plasma_client_tests) { RUN_TEST(plasma_status_tests); - RUN_TEST(plasma_fetch_remote_tests); + RUN_TEST(plasma_fetch_tests); RUN_TEST(plasma_get_local_tests); RUN_TEST(plasma_wait_for_objects_tests); RUN_TEST(plasma_get_tests); diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index af384c1c8..397e5d7c5 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -157,7 +157,7 @@ TEST request_transfer_test(void) { utstring_new(addr); utstring_printf(addr, "127.0.0.1:%d", remote_mock->port); manager_vector[0] = utstring_body(addr); - request_transfer(oid, 1, manager_vector, local_mock->client_conn); + call_request_transfer(oid, 1, manager_vector, local_mock->state); free(manager_vector); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, test_done_handler, local_mock->state); @@ -203,7 +203,7 @@ TEST request_transfer_retry_test(void) { utstring_new(addr1); utstring_printf(addr1, "127.0.0.1:%d", remote_mock2->port); manager_vector[1] = utstring_body(addr1); - request_transfer(oid, 2, manager_vector, local_mock->client_conn); + call_request_transfer(oid, 2, manager_vector, local_mock->state); free(manager_vector); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler, local_mock->state); @@ -227,45 +227,6 @@ TEST request_transfer_retry_test(void) { PASS(); } -/** - * This test checks correct behavior of request_transfer in a failure scenario. - * Specifically, when one plasma manager calls request_transfer, and the remote - * manager that holds the object is unreachable, the client should receive the - * failure message after all the retries have timed out. - * - Buffer a transfer request for the remote manager. - * - Start and stop the event loop after NUM_RETRIES timeouts to make sure that - * we trigger all the retries. - * - Expect to see a response on the plasma client saying that the object - * wasn't fetched. - */ -TEST request_transfer_timeout_test(void) { - plasma_mock *local_mock = init_plasma_mock(NULL); - plasma_mock *remote_mock = init_plasma_mock(local_mock); - const char **manager_vector = malloc(sizeof(char *)); - UT_string *addr = NULL; - utstring_new(addr); - utstring_printf(addr, "127.0.0.1:%d", remote_mock->port); - manager_vector[0] = utstring_body(addr); - request_transfer(oid, 1, manager_vector, local_mock->client_conn); - free(manager_vector); - event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * (NUM_RETRIES + 2), - test_done_handler, local_mock->state); - event_loop_run(local_mock->loop); - - plasma_reply reply; - int manager_fd = get_manager_fd(local_mock->plasma_conn); - int nbytes = recv(manager_fd, (uint8_t *) &reply, sizeof(reply), MSG_WAITALL); - ASSERT_EQ(nbytes, sizeof(reply)); - ASSERT_EQ(reply.num_object_ids, 1); - ASSERT(object_ids_equal(oid, reply.object_requests[0].object_id)); - ASSERT_EQ(reply.has_object, 0); - /* Clean up. */ - utstring_free(addr); - destroy_plasma_mock(remote_mock); - destroy_plasma_mock(local_mock); - PASS(); -} - /** * This test checks correct behavior of reading and writing an object chunk * from one manager to another. @@ -317,7 +278,6 @@ SUITE(plasma_manager_tests) { memset(&oid, 1, sizeof(oid)); RUN_TEST(request_transfer_test); RUN_TEST(request_transfer_retry_test); - RUN_TEST(request_transfer_timeout_test); RUN_TEST(read_write_object_chunk_test); } diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 391d13284..17327e8b3 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -401,85 +401,26 @@ class TestPlasmaManager(unittest.TestCase): self.redis_process.kill() def test_fetch(self): - if self.redis_process is None: - print("Cannot test fetch without a running redis instance.") - self.assertTrue(False) - for _ in range(100): - # Create an object. - object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000) - # Fetch the object from the other plasma store. - # TODO(swang): This line is a hack! It makes sure that the entry will be - # in the object table once we call the fetch operation. Remove once - # retries are implemented by Ray common. - time.sleep(0.1) - successes = self.client2.fetch([object_id1]) - self.assertEqual(successes, [True]) - # Compare the two buffers. - assert_get_object_equal(self, self.client1, self.client2, object_id1, - memory_buffer=memory_buffer1, metadata=metadata1) - # Fetch in the other direction. These should return quickly because - # client1 already has the object. - successes = self.client1.fetch([object_id1]) - self.assertEqual(successes, [True]) - assert_get_object_equal(self, self.client2, self.client1, object_id1, - memory_buffer=memory_buffer1, metadata=metadata1) - - def test_fetch_multiple(self): - if self.redis_process is None: - print("Cannot test fetch without a running redis instance.") - self.assertTrue(False) - for _ in range(20): - # Create two objects and a third fake one that doesn't exist. - object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000) - missing_object_id = random_object_id() - object_id2, memory_buffer2, metadata2 = create_object(self.client1, 2000, 2000) - object_ids = [object_id1, missing_object_id, object_id2] - # Fetch the objects from the other plasma store. The second object ID - # should timeout since it does not exist. - # TODO(swang): This line is a hack! It makes sure that the entry will be - # in the object table once we call the fetch operation. Remove once - # retries are implemented by Ray common. - time.sleep(0.1) - successes = self.client2.fetch(object_ids) - self.assertEqual(successes, [True, False, True]) - # Compare the buffers of the objects that do exist. - assert_get_object_equal(self, self.client1, self.client2, object_id1, - memory_buffer=memory_buffer1, metadata=metadata1) - assert_get_object_equal(self, self.client1, self.client2, object_id2, - memory_buffer=memory_buffer2, metadata=metadata2) - # Fetch in the other direction. The fake object still does not exist. - successes = self.client1.fetch(object_ids) - self.assertEqual(successes, [True, False, True]) - assert_get_object_equal(self, self.client2, self.client1, object_id1, - memory_buffer=memory_buffer1, metadata=metadata1) - assert_get_object_equal(self, self.client2, self.client1, object_id2, - memory_buffer=memory_buffer2, metadata=metadata2) - - # Check that calling fetch with the same object ID fails. - object_id = random_object_id() - self.assertRaises(Exception, lambda : self.client1.fetch([object_id, object_id])) - - def test_fetch2(self): if self.redis_process is None: print("Cannot test fetch without a running redis instance.") self.assertTrue(False) for _ in range(10): # Create an object. object_id1, memory_buffer1, metadata1 = create_object(self.client1, 2000, 2000) - self.client1.fetch2([object_id1]) + self.client1.fetch([object_id1]) self.assertEqual(self.client1.contains(object_id1), True) self.assertEqual(self.client2.contains(object_id1), False) # Fetch the object from the other plasma manager. # TODO(rkn): Right now we must wait for the object table to be updated. while not self.client2.contains(object_id1): - self.client2.fetch2([object_id1]) + self.client2.fetch([object_id1]) # Compare the two buffers. assert_get_object_equal(self, self.client1, self.client2, object_id1, memory_buffer=memory_buffer1, metadata=metadata1) # Test that we can call fetch on object IDs that don't exist yet. object_id2 = random_object_id() - self.client1.fetch2([object_id2]) + self.client1.fetch([object_id2]) self.assertEqual(self.client1.contains(object_id2), False) memory_buffer2, metadata2 = create_object_with_id(self.client2, object_id2, 2000, 2000) # # Check that the object has been fetched. @@ -493,19 +434,19 @@ class TestPlasmaManager(unittest.TestCase): self.assertEqual(self.client1.contains(object_id3), False) self.assertEqual(self.client2.contains(object_id3), False) for _ in range(10): - self.client1.fetch2([object_id3]) - self.client2.fetch2([object_id3]) + self.client1.fetch([object_id3]) + self.client2.fetch([object_id3]) memory_buffer3, metadata3 = create_object_with_id(self.client1, object_id3, 2000, 2000) for _ in range(10): - self.client1.fetch2([object_id3]) - self.client2.fetch2([object_id3]) + self.client1.fetch([object_id3]) + self.client2.fetch([object_id3]) #TODO(rkn): Right now we must wait for the object table to be updated. while not self.client2.contains(object_id3): - self.client2.fetch2([object_id3]) + self.client2.fetch([object_id3]) assert_get_object_equal(self, self.client1, self.client2, object_id3, memory_buffer=memory_buffer3, metadata=metadata3) - def test_fetch2_multiple(self): + def test_fetch_multiple(self): if self.redis_process is None: print("Cannot test fetch without a running redis instance.") self.assertTrue(False) @@ -519,14 +460,14 @@ class TestPlasmaManager(unittest.TestCase): # should timeout since it does not exist. # TODO(rkn): Right now we must wait for the object table to be updated. while (not self.client2.contains(object_id1)) or (not self.client2.contains(object_id2)): - self.client2.fetch2(object_ids) + self.client2.fetch(object_ids) # Compare the buffers of the objects that do exist. assert_get_object_equal(self, self.client1, self.client2, object_id1, memory_buffer=memory_buffer1, metadata=metadata1) assert_get_object_equal(self, self.client1, self.client2, object_id2, memory_buffer=memory_buffer2, metadata=metadata2) # Fetch in the other direction. The fake object still does not exist. - self.client1.fetch2(object_ids) + self.client1.fetch(object_ids) assert_get_object_equal(self, self.client2, self.client1, object_id1, memory_buffer=memory_buffer1, metadata=metadata1) assert_get_object_equal(self, self.client2, self.client1, object_id2, @@ -534,12 +475,12 @@ class TestPlasmaManager(unittest.TestCase): # Check that we can call fetch with duplicated object IDs. object_id3 = random_object_id() - self.client1.fetch2([object_id3, object_id3]) + self.client1.fetch([object_id3, object_id3]) object_id4, memory_buffer4, metadata4 = create_object(self.client1, 2000, 2000) time.sleep(0.1) # TODO(rkn): Right now we must wait for the object table to be updated. while not self.client2.contains(object_id4): - self.client2.fetch2([object_id3, object_id3, object_id4, object_id4]) + self.client2.fetch([object_id3, object_id3, object_id4, object_id4]) assert_get_object_equal(self, self.client2, self.client1, object_id4, memory_buffer=memory_buffer4, metadata=metadata4) @@ -673,16 +614,16 @@ class TestPlasmaManager(unittest.TestCase): self.client2.seal(object_id) # Give the second manager some time to complete the seal, then make sure it # exited. - time_left = 10 + time_left = 100 while time_left > 0: if self.p5.poll() != None: self.processes_to_kill.remove(self.p5) break - time_left -= 0.2 - time.sleep(0.2) + time_left -= 0.1 + time.sleep(0.1) - print("Time waiting for plasma manager to fail = {:.2}".format(10 - time_left)) - self.assertNotEqual(self.p5.returncode, None) + print("Time waiting for plasma manager to fail = {:.2}".format(100 - time_left)) + self.assertNotEqual(self.p5.poll(), None) def test_illegal_functionality(self): # Create an object id string.