From 247f95b3ff7f821429b1a5a86b1c19a2aa98acc2 Mon Sep 17 00:00:00 2001 From: Qing Wang Date: Tue, 2 Jul 2019 14:26:19 +0800 Subject: [PATCH] Refine RegisterClientRequest message to make it clearer. (#5057) * transfor driver task id Explicitly * Refins * Fix and add comment. * add more * Fix * Fix * Add comments * Fix --- src/ray/common/id.cc | 2 +- src/ray/common/id.h | 2 +- src/ray/raylet/format/node_manager.fbs | 7 +++---- src/ray/raylet/node_manager.cc | 15 ++++++--------- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 2379a22fd..0735d4d00 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -85,7 +85,7 @@ uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) { return h; } -TaskID TaskID::GetDriverTaskID(const WorkerID &driver_id) { +TaskID TaskID::ComputeDriverTaskId(const WorkerID &driver_id) { std::string driver_id_str = driver_id.Binary(); driver_id_str.resize(Size()); return TaskID::FromBinary(driver_id_str); diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 09f4a16aa..667e55769 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -72,7 +72,7 @@ class TaskID : public BaseID { public: TaskID() : BaseID() {} static size_t Size() { return kTaskIDSize; } - static TaskID GetDriverTaskID(const WorkerID &driver_id); + static TaskID ComputeDriverTaskId(const WorkerID &driver_id); private: uint8_t id_[kTaskIDSize]; diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index aba7ab8cf..d5f5f2c7c 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -131,12 +131,11 @@ table RegisterClientRequest { // True if the client is a worker and false if the client is a driver. is_worker: bool; // The ID of the worker or driver. - client_id: string; + worker_id: string; // The process ID of this worker. worker_pid: long; - // The driver ID. This is non-nil if the client is a driver. - // TODO(qwang): rename this to driver_task_id. - driver_id: string; + // The job ID if the client is a driver, otherwise it should be NIL. + job_id: string; // Language of this worker. language: Language; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8e2cdf684..35da34fe5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -844,7 +844,7 @@ void NodeManager::ProcessClientMessage( void NodeManager::ProcessRegisterClientRequestMessage( const std::shared_ptr &client, const uint8_t *message_data) { auto message = flatbuffers::GetRoot(message_data); - client->SetClientID(from_flatbuf(*message->client_id())); + client->SetClientID(from_flatbuf(*message->worker_id())); auto worker = std::make_shared(message->worker_pid(), message->language(), client); if (message->is_worker()) { @@ -852,15 +852,12 @@ void NodeManager::ProcessRegisterClientRequestMessage( worker_pool_.RegisterWorker(std::move(worker)); DispatchTasks(local_queues_.GetReadyTasksWithResources()); } else { - // Register the new driver. Note that here the driver_id in RegisterClientRequest - // message is actually the ID of the driver task, while client_id represents the - // real driver ID, which can associate all the tasks/actors for a given driver, - // which is set to the worker ID. - // TODO(qwang): Use driver_task_id instead here. - const WorkerID driver_id = from_flatbuf(*message->driver_id()); - TaskID driver_task_id = TaskID::GetDriverTaskID(driver_id); + // Register the new driver. + const WorkerID driver_id = from_flatbuf(*message->worker_id()); + // Compute a dummy driver task id from a given driver. + const TaskID driver_task_id = TaskID::ComputeDriverTaskId(driver_id); worker->AssignTaskId(driver_task_id); - worker->AssignJobId(from_flatbuf(*message->client_id())); + worker->AssignJobId(from_flatbuf(*message->job_id())); worker_pool_.RegisterDriver(std::move(worker)); local_queues_.AddDriverTaskId(driver_task_id); }