diff --git a/python/ray/includes/libraylet.pxd b/python/ray/includes/libraylet.pxd index 55b0fe148..93534eb3a 100644 --- a/python/ray/includes/libraylet.pxd +++ b/python/ray/includes/libraylet.pxd @@ -48,8 +48,6 @@ cdef extern from "ray/raylet/raylet_client.h" nogil: const CLanguage &language) CRayStatus Disconnect() CRayStatus SubmitTask(const CTaskSpec &task_spec) - CRayStatus GetTask(unique_ptr[CTaskSpec] *task_spec) - CRayStatus TaskDone() CRayStatus FetchOrReconstruct(c_vector[CObjectID] &object_ids, c_bool fetch_only, const CTaskID ¤t_task_id) diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index feb278c8a..e0d07f79f 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -25,9 +25,6 @@ enum MessageType:int { // Notify the raylet that this client is disconnecting gracefully. // This is sent from a worker to a raylet. IntentionalDisconnectClient, - // Get a new task from the raylet. This is sent from a worker to a - // raylet. - GetTask, // Tell a worker to execute a task. This is sent from a raylet to a // worker. ExecuteTask, @@ -119,14 +116,6 @@ table ResourceIdSetInfos { resource_infos: [ResourceIdSetInfo]; } -// This message is sent from the raylet to a worker. -table GetTaskReply { - // A string of bytes representing the task specification. - task_spec: string; - // A list of the resources reserved for this worker. - fractional_resource_ids: ResourceIdSetInfos; -} - // This struct is used to register a new worker with the raylet. // It is shipped as part of raylet_connect. table RegisterClientRequest { @@ -142,8 +131,6 @@ table RegisterClientRequest { // TODO(hchen): Use `Language` in `common.proto`. language: int; // Port that this worker is listening on. - // If port > 0, then worker will listen to this port and wait for - // raylet to push tasks, instead of invoking GetTask(). port: int; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2b0aae0da..9ccfd28e7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -867,12 +867,7 @@ void NodeManager::ProcessClientMessage( case protocol::MessageType::RegisterClientRequest: { ProcessRegisterClientRequestMessage(client, message_data); } break; - case protocol::MessageType::GetTask: { - RAY_CHECK(!registered_worker->UsePush()); - HandleWorkerAvailable(client); - } break; case protocol::MessageType::TaskDone: { - RAY_CHECK(registered_worker->UsePush()); HandleWorkerAvailable(client); } break; case protocol::MessageType::DisconnectClient: { @@ -956,12 +951,8 @@ void NodeManager::ProcessRegisterClientRequestMessage( Status status; if (message->is_worker()) { // Register the new worker. - bool use_push_task = worker->UsePush(); - auto connection = worker->Connection(); - status = worker_pool_.RegisterWorker(std::move(worker)); - if (status.ok() && use_push_task) { - // only call `HandleWorkerAvailable` when push mode is used. - HandleWorkerAvailable(connection); + if (worker_pool_.RegisterWorker(std::move(worker)).ok()) { + HandleWorkerAvailable(worker->Connection()); } } else { // Register the new driver. @@ -1903,22 +1894,18 @@ bool NodeManager::AssignTask(const Task &task) { auto task_id = spec.TaskId(); auto finish_assign_task_callback = [this, worker, task_id](Status status) { - if (worker->UsePush()) { - // NOTE: we cannot directly call `FinishAssignTask` here because - // it assumes the task is in SWAP queue, thus we need to delay invoking this - // function after the assigned tasks are moved from READY queue to SWAP queue - // in `DispatchTasks`. - // Another option is to move the tasks to SWAP queue here just before calling - // `FinishAssignTask` so we can save an io_service post, at the - // expense of calling `MoveTask` for each of the assigned tasks. - // TODO(zhijunfu): after all workers are fully migrated to push mode, the - // `post` below and swap queue can be removed. - io_service_.post([this, status, worker, task_id]() { - FinishAssignTask(task_id, *worker, status.ok()); - }); - } else { + // NOTE: we cannot directly call `FinishAssignTask` here because + // it assumes the task is in SWAP queue, thus we need to delay invoking this + // function after the assigned tasks are moved from READY queue to SWAP queue + // in `DispatchTasks`. + // Another option is to move the tasks to SWAP queue here just before calling + // `FinishAssignTask` so we can save an io_service post, at the + // expense of calling `MoveTask` for each of the assigned tasks. + // TODO(zhijunfu): after all workers are fully migrated to push mode, the + // `post` below and swap queue can be removed. + io_service_.post([this, status, worker, task_id]() { FinishAssignTask(task_id, *worker, status.ok()); - } + }); }; ResourceIdSet resource_id_set = diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index 029397559..e37a7dab3 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -227,45 +227,6 @@ ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) { return conn_->WriteMessage(MessageType::SubmitTask, &fbb); } -ray::Status RayletClient::GetTask(std::unique_ptr *task_spec) { - std::unique_ptr reply; - // Receive a task from the raylet. This will block until the raylet - // gives this client a task. - auto status = - conn_->AtomicRequestReply(MessageType::GetTask, MessageType::ExecuteTask, reply); - if (!status.ok()) return status; - // Parse the flatbuffer object. - auto reply_message = flatbuffers::GetRoot(reply.get()); - // Set the resource IDs for this task. - resource_ids_.clear(); - for (size_t i = 0; - i < reply_message->fractional_resource_ids()->resource_infos()->size(); ++i) { - auto const &fractional_resource_ids = - reply_message->fractional_resource_ids()->resource_infos()->Get(i); - auto &acquired_resources = - resource_ids_[string_from_flatbuf(*fractional_resource_ids->resource_name())]; - - size_t num_resource_ids = fractional_resource_ids->resource_ids()->size(); - size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size(); - RAY_CHECK(num_resource_ids == num_resource_fractions); - RAY_CHECK(num_resource_ids > 0); - for (size_t j = 0; j < num_resource_ids; ++j) { - int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j); - double resource_fraction = fractional_resource_ids->resource_fractions()->Get(j); - if (num_resource_ids > 1) { - int64_t whole_fraction = resource_fraction; - RAY_CHECK(whole_fraction == resource_fraction); - } - acquired_resources.push_back(std::make_pair(resource_id, resource_fraction)); - } - } - - // Return the copy of the task spec and pass ownership to the caller. - task_spec->reset( - new ray::TaskSpecification(string_from_flatbuf(*reply_message->task_spec()))); - return ray::Status::OK(); -} - ray::Status RayletClient::TaskDone() { return conn_->WriteMessage(MessageType::TaskDone); } diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index 71d0075b0..3e36c2078 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -84,14 +84,6 @@ class RayletClient { /// \return ray::Status. ray::Status SubmitTask(const ray::TaskSpecification &task_spec); - /// Get next task for this client. This will block until the scheduler assigns - /// a task to this worker. The caller takes ownership of the returned task - /// specification and must free it. - /// - /// \param task_spec The assigned task. - /// \return ray::Status. - ray::Status GetTask(std::unique_ptr *task_spec); - /// Tell the raylet that the client has finished executing a task. /// /// \return ray::Status. diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index dc2b223d7..c4d66971c 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -121,52 +121,32 @@ void Worker::SetActiveObjectIds(const std::unordered_set &&object_ids) active_object_ids_ = object_ids; } -bool Worker::UsePush() const { return rpc_client_ != nullptr; } - void Worker::AssignTask(const Task &task, const ResourceIdSet &resource_id_set, const std::function finish_assign_callback) { - const TaskSpecification &spec = task.GetTaskSpecification(); - if (rpc_client_ != nullptr) { - // Use push mode. - RAY_CHECK(port_ > 0); - rpc::AssignTaskRequest request; - request.mutable_task()->mutable_task_spec()->CopyFrom( - task.GetTaskSpecification().GetMessage()); - request.mutable_task()->mutable_task_execution_spec()->CopyFrom( - task.GetTaskExecutionSpec().GetMessage()); - request.set_resource_ids(resource_id_set.Serialize()); + RAY_CHECK(port_ > 0); + rpc::AssignTaskRequest request; + request.mutable_task()->mutable_task_spec()->CopyFrom( + task.GetTaskSpecification().GetMessage()); + request.mutable_task()->mutable_task_execution_spec()->CopyFrom( + task.GetTaskExecutionSpec().GetMessage()); + request.set_resource_ids(resource_id_set.Serialize()); - auto status = rpc_client_->AssignTask(request, [](Status status, - const rpc::AssignTaskReply &reply) { - if (!status.ok()) { - RAY_LOG(DEBUG) << "Worker failed to finish executing task: " << status.ToString(); - } - // Worker has finished this task. There's nothing to do here - // and assigning new task will be done when raylet receives - // `TaskDone` message. - }); - finish_assign_callback(status); + auto status = rpc_client_->AssignTask(request, [](Status status, + const rpc::AssignTaskReply &reply) { if (!status.ok()) { - RAY_LOG(ERROR) << "Failed to assign task " << task.GetTaskSpecification().TaskId() - << " to worker " << worker_id_; - } else { - RAY_LOG(DEBUG) << "Assigned task " << task.GetTaskSpecification().TaskId() - << " to worker " << worker_id_; + RAY_LOG(DEBUG) << "Worker failed to finish executing task: " << status.ToString(); } + // Worker has finished this task. There's nothing to do here + // and assigning new task will be done when raylet receives + // `TaskDone` message. + }); + finish_assign_callback(status); + if (!status.ok()) { + RAY_LOG(ERROR) << "Failed to assign task " << task.GetTaskSpecification().TaskId() + << " to worker " << worker_id_; } else { - // Use pull mode. This corresponds to existing python/java workers that haven't been - // migrated to core worker architecture. - flatbuffers::FlatBufferBuilder fbb; - auto resource_id_set_flatbuf = resource_id_set.ToFlatbuf(fbb); - - auto message = - protocol::CreateGetTaskReply(fbb, fbb.CreateString(spec.Serialize()), - protocol::CreateResourceIdSetInfos( - fbb, fbb.CreateVector(resource_id_set_flatbuf))); - fbb.Finish(message); - Connection()->WriteMessageAsync( - static_cast(protocol::MessageType::ExecuteTask), fbb.GetSize(), - fbb.GetBufferPointer(), finish_assign_callback); + RAY_LOG(DEBUG) << "Assigned task " << task.GetTaskSpecification().TaskId() + << " to worker " << worker_id_; } } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 79642527e..45e7b5e7b 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -60,7 +60,6 @@ class Worker { const std::unordered_set &GetActiveObjectIds() const; void SetActiveObjectIds(const std::unordered_set &&object_ids); - bool UsePush() const; void AssignTask(const Task &task, const ResourceIdSet &resource_id_set, const std::function finish_assign_callback);