diff --git a/python/ray/worker.py b/python/ray/worker.py index 3b3b02d70..e1db62bf0 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -460,6 +460,7 @@ class Worker(object): object_ids (List[object_id.ObjectID]): A list of the object IDs whose values should be retrieved. """ + # Do an initial fetch for remote objects. self.plasma_client.fetch([object_id.id() for object_id in object_ids]) # Get the objects. We initially try to get the objects immediately. @@ -477,6 +478,9 @@ class Worker(object): while len(unready_ids) > 0: for unready_id in unready_ids: self.photon_client.reconstruct_object(unready_id) + # Do another fetch for objects that aren't available locally yet, in case + # they were evicted since the last fetch. + self.plasma_client.fetch(list(unready_ids.keys())) results = numbuf.retrieve_list(list(unready_ids.keys()), self.plasma_client.conn, GET_TIMEOUT_MILLISECONDS) diff --git a/src/common/redis_module/ray_redis_module.c b/src/common/redis_module/ray_redis_module.c index 94fc779c1..949918317 100644 --- a/src/common/redis_module/ray_redis_module.c +++ b/src/common/redis_module/ray_redis_module.c @@ -459,13 +459,11 @@ int ObjectTableRemove_RedisCommand(RedisModuleCtx *ctx, /** * Request notifications about the presence of some object IDs. This command - * takes a list of object IDs. There will be an immediate reply acknowledging - * the call and containing a list of all the object IDs that are already - * present in the object table along with vectors of the plasma managers that - * contain each object. For each object ID that is not already present in the - * object table, there will be a separate subsequent reply that returns the list - * of manager vectors conaining the object ID, and this will be called as soon - * as the object is added to the object table. + * takes a list of object IDs. For each object ID, the reply will be the list + * of plasma managers that contain the object. If the list of plasma managers + * is currently nonempty, then the reply will happen immediately. Else, the + * reply will come later, on the first invocation of `RAY.OBJECT_TABLE_ADD` + * following this call. * * This is called from a client with the command: * diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index 2c951bb35..ef3deeae5 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -726,6 +726,27 @@ void process_transfer_request(event_loop *loop, const char *addr, int port, client_connection *conn) { + client_connection *manager_conn = + get_manager_connection(conn->manager_state, addr, port); + + /* If there is already a request in the transfer queue with the same object + * ID, do not add the transfer request. */ + plasma_request_buffer *pending; + LL_FOREACH(manager_conn->transfer_queue, pending) { + if (object_ids_equal(pending->object_id, obj_id) && + (pending->type == MessageType_PlasmaDataReply)) { + return; + } + } + + /* If we already have a connection to this manager and its inactive, + * (re)register it with the event loop again. */ + if (manager_conn->transfer_queue == NULL) { + event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE, + send_queued_request, manager_conn); + } + + /* Allocate and append the request to the transfer queue. */ uint8_t *data; int64_t data_size; uint8_t *metadata; @@ -761,23 +782,6 @@ void process_transfer_request(event_loop *loop, buf->data_size = obj_buffer.data_size; buf->metadata_size = obj_buffer.metadata_size; - client_connection *manager_conn = - get_manager_connection(conn->manager_state, addr, port); - - if (manager_conn->transfer_queue == NULL) { - /* If we already have a connection to this manager and its inactive, - * (re)register it with the event loop again. */ - event_loop_add_file(loop, manager_conn->fd, EVENT_LOOP_WRITE, - send_queued_request, manager_conn); - } - /* Add this transfer request to this connection's transfer queue if there - * isn't already a request with the same object ID. */ - plasma_request_buffer *pending; - LL_FOREACH(manager_conn->transfer_queue, pending) { - if (object_ids_equal(pending->object_id, buf->object_id)) { - return; - } - } LL_APPEND(manager_conn->transfer_queue, buf); } @@ -1253,7 +1257,7 @@ void process_delete_object_notification(plasma_manager_state *state, retry_info retry = { .num_retries = NUM_RETRIES, .timeout = MANAGER_TIMEOUT, - .fail_callback = NULL, + .fail_callback = fatal_table_callback, }; object_table_remove(state->db, obj_id, NULL, &retry, NULL, NULL); } @@ -1280,7 +1284,7 @@ void process_add_object_notification(plasma_manager_state *state, retry_info retry = { .num_retries = NUM_RETRIES, .timeout = MANAGER_TIMEOUT, - .fail_callback = NULL, + .fail_callback = fatal_table_callback, }; object_table_add(state->db, obj_id, object_info.data_size + object_info.metadata_size, diff --git a/test/stress_tests.py b/test/stress_tests.py index 71598a588..667a58551 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -154,7 +154,7 @@ class ReconstructionTests(unittest.TestCase): ray.worker._init(address_info=address_info, start_ray_local=True, num_workers=self.num_local_schedulers, num_local_schedulers=self.num_local_schedulers, - num_cpus=100) + num_cpus=[1] * self.num_local_schedulers) def tearDown(self): self.assertTrue(ray.services.all_processes_alive())