From 4ec5bea03b32c2f5ac9cf2d36e3877dbd32785b0 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 9 Jun 2018 23:36:27 -0700 Subject: [PATCH] [xray] Implement fetch (#2195) --- python/ray/worker.py | 14 +++- .../format/local_scheduler.fbs | 14 ++-- ...ay_spi_impl_DefaultLocalSchedulerClient.cc | 2 +- .../lib/python/local_scheduler_extension.cc | 30 ++++++-- src/local_scheduler/local_scheduler.cc | 12 +-- src/local_scheduler/local_scheduler_client.cc | 13 ++-- src/local_scheduler/local_scheduler_client.h | 11 ++- .../test/local_scheduler_tests.cc | 9 ++- src/ray/raylet/format/node_manager.fbs | 14 ++-- src/ray/raylet/node_manager.cc | 76 ++++++++++--------- test/stress_tests.py | 4 +- 11 files changed, 120 insertions(+), 79 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 00a80ec68..76db62ca6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -449,7 +449,9 @@ class Worker(object): self.plasma_client.fetch(plain_object_ids[i:( i + ray._config.worker_fetch_request_size())]) else: - print("plasma_client.fetch has not been implemented yet") + self.local_scheduler_client.reconstruct_objects( + object_ids[i:( + i + ray._config.worker_fetch_request_size())], True) # Get the objects. We initially try to get the objects immediately. final_results = self.retrieve_and_deserialize(plain_object_ids, 0) @@ -466,20 +468,26 @@ class Worker(object): # repeat. while len(unready_ids) > 0: for unready_id in unready_ids: - self.local_scheduler_client.reconstruct_object(unready_id) + self.local_scheduler_client.reconstruct_objects( + [ray.ObjectID(unready_id)], False) # Do another fetch for objects that aren't available locally yet, # in case they were evicted since the last fetch. We divide the # fetch into smaller fetches so as to not block the manager for a # prolonged period of time in a single call. object_ids_to_fetch = list( map(plasma.ObjectID, unready_ids.keys())) + ray_object_ids_to_fetch = list( + map(ray.ObjectID, unready_ids.keys())) for i in range(0, len(object_ids_to_fetch), ray._config.worker_fetch_request_size()): if not self.use_raylet: self.plasma_client.fetch(object_ids_to_fetch[i:( i + ray._config.worker_fetch_request_size())]) else: - print("plasma_client.fetch has not been implemented yet") + self.local_scheduler_client.reconstruct_objects( + ray_object_ids_to_fetch[i:( + i + ray._config.worker_fetch_request_size())], + True) results = self.retrieve_and_deserialize( object_ids_to_fetch, max([ diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index 9cb923c30..8d7054e5e 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -26,9 +26,9 @@ enum MessageType:int { // Tell a worker to execute a task. This is sent from a local scheduler to a // worker. ExecuteTask, - // Reconstruct a possibly lost object. This is sent from a worker to a local - // scheduler. - ReconstructObject, + // Reconstruct or fetch possibly lost objects. This is sent from a worker to + // a local scheduler. + ReconstructObjects, // For a worker that was blocked on some object(s), tell the local scheduler // that the worker is now unblocked. This is sent from a worker to a local // scheduler. @@ -84,9 +84,11 @@ table RegisterClientRequest { table DisconnectClient { } -table ReconstructObject { - // Object ID of the object that needs to be reconstructed. - object_id: string; +table ReconstructObjects { + // List of object IDs of the objects that we want to reconstruct or fetch. + object_ids: [string]; + // Do we only want to fetch the objects or also reconstruct them? + fetch_only: bool; } table PutObject { diff --git a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc index d5cd92eb5..dfd47adbc 100644 --- a/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc +++ b/src/local_scheduler/lib/java/org_ray_spi_impl_DefaultLocalSchedulerClient.cc @@ -188,7 +188,7 @@ Java_org_ray_spi_impl_DefaultLocalSchedulerClient__1reconstruct_1object( // objectId); UniqueIdFromJByteArray o(env, oid); auto client = reinterpret_cast(c); - local_scheduler_reconstruct_object(client, *o.PID); + local_scheduler_reconstruct_objects(client, {*o.PID}); } /* diff --git a/src/local_scheduler/lib/python/local_scheduler_extension.cc b/src/local_scheduler/lib/python/local_scheduler_extension.cc index 89ae40259..3645c3111 100644 --- a/src/local_scheduler/lib/python/local_scheduler_extension.cc +++ b/src/local_scheduler/lib/python/local_scheduler_extension.cc @@ -81,14 +81,28 @@ static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) { } // clang-format on -static PyObject *PyLocalSchedulerClient_reconstruct_object(PyObject *self, - PyObject *args) { - ObjectID object_id; - if (!PyArg_ParseTuple(args, "O&", PyStringToUniqueID, &object_id)) { +static PyObject *PyLocalSchedulerClient_reconstruct_objects(PyObject *self, + PyObject *args) { + PyObject *py_object_ids; + PyObject *py_fetch_only; + std::vector object_ids; + if (!PyArg_ParseTuple(args, "OO", &py_object_ids, &py_fetch_only)) { return NULL; } - local_scheduler_reconstruct_object( - ((PyLocalSchedulerClient *) self)->local_scheduler_connection, object_id); + bool fetch_only = PyObject_IsTrue(py_fetch_only); + Py_ssize_t n = PyList_Size(py_object_ids); + for (int64_t i = 0; i < n; ++i) { + ObjectID object_id; + PyObject *py_object_id = PyList_GetItem(py_object_ids, i); + if (!PyObjectToUniqueID(py_object_id, &object_id)) { + return NULL; + } + object_ids.push_back(object_id); + } + local_scheduler_reconstruct_objects( + reinterpret_cast(self) + ->local_scheduler_connection, + object_ids, fetch_only); Py_RETURN_NONE; } @@ -238,8 +252,8 @@ static PyMethodDef PyLocalSchedulerClient_methods[] = { "Submit a task to the local scheduler."}, {"get_task", (PyCFunction) PyLocalSchedulerClient_get_task, METH_NOARGS, "Get a task from the local scheduler."}, - {"reconstruct_object", - (PyCFunction) PyLocalSchedulerClient_reconstruct_object, METH_VARARGS, + {"reconstruct_objects", + (PyCFunction) PyLocalSchedulerClient_reconstruct_objects, METH_VARARGS, "Ask the local scheduler to reconstruct an object."}, {"log_event", (PyCFunction) PyLocalSchedulerClient_log_event, METH_VARARGS, "Log an event to the event log through the local scheduler."}, diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 1e659db69..4ee3b2ee5 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -1163,10 +1163,10 @@ void process_message(event_loop *loop, handle_actor_worker_available(state, state->algorithm_state, worker); } } break; - case static_cast(MessageType::ReconstructObject): { - auto message = - flatbuffers::GetRoot( - input); + case static_cast(MessageType::ReconstructObjects): { + auto message = flatbuffers::GetRoot< + ray::local_scheduler::protocol::ReconstructObjects>(input); + RAY_CHECK(!message->fetch_only()); if (worker->task_in_progress != NULL && !worker->is_blocked) { /* If the worker was executing a task (i.e. non-driver) and it wasn't * already blocked on an object that's not locally available, update its @@ -1190,7 +1190,9 @@ void process_message(event_loop *loop, } print_worker_info("Reconstructing", state->algorithm_state); } - reconstruct_object(state, from_flatbuf(*message->object_id())); + RAY_CHECK(message->object_ids()->size() == 1); + ObjectID object_id = from_flatbuf(*message->object_ids()->Get(0)); + reconstruct_object(state, object_id); } break; case static_cast(CommonMessageType::DISCONNECT_CLIENT): { RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock; diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 89fe3fd33..8d838e1e8 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -138,14 +138,17 @@ void local_scheduler_task_done(LocalSchedulerConnection *conn) { NULL); } -void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn, - ObjectID object_id) { +void local_scheduler_reconstruct_objects( + LocalSchedulerConnection *conn, + const std::vector &object_ids, + bool fetch_only) { flatbuffers::FlatBufferBuilder fbb; - auto message = ray::local_scheduler::protocol::CreateReconstructObject( - fbb, to_flatbuf(fbb, object_id)); + auto object_ids_message = to_flatbuf(fbb, object_ids); + auto message = ray::local_scheduler::protocol::CreateReconstructObjects( + fbb, object_ids_message, fetch_only); fbb.Finish(message); write_message(conn->conn, - static_cast(MessageType::ReconstructObject), + static_cast(MessageType::ReconstructObjects), fbb.GetSize(), fbb.GetBufferPointer()); /* TODO(swang): Propagate the error. */ } diff --git a/src/local_scheduler/local_scheduler_client.h b/src/local_scheduler/local_scheduler_client.h index ed6f3916e..5608c5aa8 100644 --- a/src/local_scheduler/local_scheduler_client.h +++ b/src/local_scheduler/local_scheduler_client.h @@ -111,14 +111,17 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, void local_scheduler_task_done(LocalSchedulerConnection *conn); /** - * Tell the local scheduler to reconstruct an object. + * Tell the local scheduler to reconstruct or fetch objects. * * @param conn The connection information. - * @param object_id The ID of the object to reconstruct. + * @param object_ids The IDs of the objects to reconstruct. + * @param fetch_only Only fetch objects, do not reconstruct them. * @return Void. */ -void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn, - ObjectID object_id); +void local_scheduler_reconstruct_objects( + LocalSchedulerConnection *conn, + const std::vector &object_ids, + bool fetch_only = false); /** * Send a log message to the local scheduler. diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index d1071d912..dfcf0053f 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -244,7 +244,8 @@ TEST object_reconstruction_test(void) { /* Trigger reconstruction, and run the event loop again. */ ObjectID return_id = TaskSpec_return(spec, 0); - local_scheduler_reconstruct_object(worker, return_id); + local_scheduler_reconstruct_objects(worker, + std::vector({return_id})); event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); event_loop_run(local_scheduler->loop); @@ -369,7 +370,8 @@ TEST object_reconstruction_recursive_test(void) { } /* Trigger reconstruction for the last object. */ ObjectID return_id = TaskSpec_return(specs[NUM_TASKS - 1].Spec(), 0); - local_scheduler_reconstruct_object(worker, return_id); + local_scheduler_reconstruct_objects(worker, + std::vector({return_id})); /* Run the event loop again. All tasks should be resubmitted. */ event_loop_add_timer(local_scheduler->loop, 500, (event_loop_timer_handler) timeout_handler, NULL); @@ -437,7 +439,8 @@ TEST object_reconstruction_suppression_test(void) { 0); /* Trigger a reconstruction. We will check that no tasks get queued as a * result of this line in the event loop process. */ - local_scheduler_reconstruct_object(worker, return_id); + local_scheduler_reconstruct_objects(worker, + std::vector({return_id})); /* Clean up. */ free(task_assigned); LocalSchedulerMock_free(local_scheduler); diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index ba3dda4cb..ca25120eb 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -29,9 +29,9 @@ enum MessageType:int { // Tell a worker to execute a task. This is sent from a local scheduler to a // worker. ExecuteTask, - // Reconstruct a possibly lost object. This is sent from a worker to a local - // scheduler. - ReconstructObject, + // Reconstruct or fetch possibly lost objects. This is sent from a worker to + // a local scheduler. + ReconstructObjects, // For a worker that was blocked on some object(s), tell the local scheduler // that the worker is now unblocked. This is sent from a worker to a local // scheduler. @@ -118,9 +118,11 @@ table ForwardTaskRequest { uncommitted_tasks: [Task]; } -table ReconstructObject { - // Object ID of the object that needs to be reconstructed. - object_id: string; +table ReconstructObjects { + // List of object IDs of the objects that we want to reconstruct or fetch. + object_ids: [string]; + // Do we only want to fetch the objects or also reconstruct them? + fetch_only: bool; } table WaitRequest { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ad2049887..c05742dfb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -28,8 +28,8 @@ RAY_CHECK_ENUM(protocol::MessageType::GetTask, local_scheduler_protocol::MessageType::GetTask); RAY_CHECK_ENUM(protocol::MessageType::ExecuteTask, local_scheduler_protocol::MessageType::ExecuteTask); -RAY_CHECK_ENUM(protocol::MessageType::ReconstructObject, - local_scheduler_protocol::MessageType::ReconstructObject); +RAY_CHECK_ENUM(protocol::MessageType::ReconstructObjects, + local_scheduler_protocol::MessageType::ReconstructObjects); RAY_CHECK_ENUM(protocol::MessageType::NotifyUnblocked, local_scheduler_protocol::MessageType::NotifyUnblocked); RAY_CHECK_ENUM(protocol::MessageType::PutObject, @@ -391,43 +391,47 @@ void NodeManager::ProcessClientMessage( // locally, there is no uncommitted lineage. SubmitTask(task, Lineage()); } break; - case protocol::MessageType::ReconstructObject: { + case protocol::MessageType::ReconstructObjects: { // TODO(hme): handle multiple object ids. - auto message = flatbuffers::GetRoot(message_data); - ObjectID object_id = from_flatbuf(*message->object_id()); - RAY_LOG(DEBUG) << "reconstructing object " << object_id; - if (!task_dependency_manager_.CheckObjectLocal(object_id)) { - // TODO(swang): Instead of calling Pull on the object directly, record the - // fact that the blocked task is dependent on this object_id in the task - // dependency manager. - RAY_CHECK_OK(object_manager_.Pull(object_id)); - } + auto message = flatbuffers::GetRoot(message_data); + for (size_t i = 0; i < message->object_ids()->size(); ++i) { + ObjectID object_id = from_flatbuf(*message->object_ids()->Get(i)); + RAY_LOG(DEBUG) << "reconstructing object " << object_id; + if (!task_dependency_manager_.CheckObjectLocal(object_id)) { + // TODO(swang): Instead of calling Pull on the object directly, record the + // fact that the blocked task is dependent on this object_id in the task + // dependency manager. + RAY_CHECK_OK(object_manager_.Pull(object_id)); + } - // If the blocked client is a worker, and the worker isn't already blocked, - // then release any CPU resources that it acquired for its assigned task - // while it is blocked. The resources will be acquired again once the - // worker is unblocked. - std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); - if (worker && !worker->IsBlocked()) { - RAY_CHECK(!worker->GetAssignedTaskId().is_nil()); - auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); - const auto &task = tasks.front(); - // Get the CPU resources required by the running task. - const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); - double required_cpus = required_resources.GetNumCpus(); - const std::unordered_map cpu_resources = { - {kCPU_ResourceLabel, required_cpus}}; - // Release the CPU resources. - RAY_CHECK( - cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( - ResourceSet(cpu_resources))); - // Mark the task as blocked. - local_queues_.QueueBlockedTasks(tasks); - worker->MarkBlocked(); + if (!message->fetch_only()) { + // If the blocked client is a worker, and the worker isn't already blocked, + // then release any CPU resources that it acquired for its assigned task + // while it is blocked. The resources will be acquired again once the + // worker is unblocked. + std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); + if (worker && !worker->IsBlocked()) { + RAY_CHECK(!worker->GetAssignedTaskId().is_nil()); + auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); + const auto &task = tasks.front(); + // Get the CPU resources required by the running task. + const auto required_resources = + task.GetTaskSpecification().GetRequiredResources(); + double required_cpus = required_resources.GetNumCpus(); + const std::unordered_map cpu_resources = { + {kCPU_ResourceLabel, required_cpus}}; + // Release the CPU resources. + RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] + .Release(ResourceSet(cpu_resources))); + // Mark the task as blocked. + local_queues_.QueueBlockedTasks(tasks); + worker->MarkBlocked(); - // Try to dispatch more tasks since the blocked worker released some - // resources. - DispatchTasks(); + // Try to dispatch more tasks since the blocked worker released some + // resources. + DispatchTasks(); + } + } } } break; case protocol::MessageType::NotifyUnblocked: { diff --git a/test/stress_tests.py b/test/stress_tests.py index fd40b1e52..b929110b3 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -492,8 +492,8 @@ class ReconstructionTests(unittest.TestCase): # were evicted and whose originating tasks are still running, this # for-loop should hang on its first iteration and push an error to the # driver. - ray.worker.global_worker.local_scheduler_client.reconstruct_object( - args[0].id()) + ray.worker.global_worker.local_scheduler_client.reconstruct_objects( + [args[0]], False) def error_check(errors): return len(errors) > 1