Refine RegisterClientRequest message to make it clearer. (#5057)

* transfor driver task id Explicitly

* Refins

* Fix and add comment.

* add more

* Fix

* Fix

* Add comments

* Fix
This commit is contained in:
Qing Wang
2019-07-02 14:26:19 +08:00
committed by GitHub
parent a6a02fccd0
commit 247f95b3ff
4 changed files with 11 additions and 15 deletions
+1 -1
View File
@@ -85,7 +85,7 @@ uint64_t MurmurHash64A(const void *key, int len, unsigned int seed) {
return h;
}
TaskID TaskID::GetDriverTaskID(const WorkerID &driver_id) {
TaskID TaskID::ComputeDriverTaskId(const WorkerID &driver_id) {
std::string driver_id_str = driver_id.Binary();
driver_id_str.resize(Size());
return TaskID::FromBinary(driver_id_str);
+1 -1
View File
@@ -72,7 +72,7 @@ class TaskID : public BaseID<TaskID> {
public:
TaskID() : BaseID() {}
static size_t Size() { return kTaskIDSize; }
static TaskID GetDriverTaskID(const WorkerID &driver_id);
static TaskID ComputeDriverTaskId(const WorkerID &driver_id);
private:
uint8_t id_[kTaskIDSize];
+3 -4
View File
@@ -131,12 +131,11 @@ table RegisterClientRequest {
// True if the client is a worker and false if the client is a driver.
is_worker: bool;
// The ID of the worker or driver.
client_id: string;
worker_id: string;
// The process ID of this worker.
worker_pid: long;
// The driver ID. This is non-nil if the client is a driver.
// TODO(qwang): rename this to driver_task_id.
driver_id: string;
// The job ID if the client is a driver, otherwise it should be NIL.
job_id: string;
// Language of this worker.
language: Language;
}
+6 -9
View File
@@ -844,7 +844,7 @@ void NodeManager::ProcessClientMessage(
void NodeManager::ProcessRegisterClientRequestMessage(
const std::shared_ptr<LocalClientConnection> &client, const uint8_t *message_data) {
auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
client->SetClientID(from_flatbuf<ClientID>(*message->client_id()));
client->SetClientID(from_flatbuf<ClientID>(*message->worker_id()));
auto worker =
std::make_shared<Worker>(message->worker_pid(), message->language(), client);
if (message->is_worker()) {
@@ -852,15 +852,12 @@ void NodeManager::ProcessRegisterClientRequestMessage(
worker_pool_.RegisterWorker(std::move(worker));
DispatchTasks(local_queues_.GetReadyTasksWithResources());
} else {
// Register the new driver. Note that here the driver_id in RegisterClientRequest
// message is actually the ID of the driver task, while client_id represents the
// real driver ID, which can associate all the tasks/actors for a given driver,
// which is set to the worker ID.
// TODO(qwang): Use driver_task_id instead here.
const WorkerID driver_id = from_flatbuf<WorkerID>(*message->driver_id());
TaskID driver_task_id = TaskID::GetDriverTaskID(driver_id);
// Register the new driver.
const WorkerID driver_id = from_flatbuf<WorkerID>(*message->worker_id());
// Compute a dummy driver task id from a given driver.
const TaskID driver_task_id = TaskID::ComputeDriverTaskId(driver_id);
worker->AssignTaskId(driver_task_id);
worker->AssignJobId(from_flatbuf<JobID>(*message->client_id()));
worker->AssignJobId(from_flatbuf<JobID>(*message->job_id()));
worker_pool_.RegisterDriver(std::move(worker));
local_queues_.AddDriverTaskId(driver_task_id);
}