From 451cdb43f65fb77240f4ed78b39d4d8a89ad04fa Mon Sep 17 00:00:00 2001 From: songqing Date: Tue, 5 Jun 2018 15:08:05 +0800 Subject: [PATCH] Fix redefinition of flatbuffer types (#2189) --- .../format/local_scheduler.fbs | 1 + src/local_scheduler/local_scheduler.cc | 80 +++++++++------- src/local_scheduler/local_scheduler_client.cc | 95 +++++++++++-------- src/ray/raylet/node_manager.cc | 40 +++++--- 4 files changed, 133 insertions(+), 83 deletions(-) diff --git a/src/local_scheduler/format/local_scheduler.fbs b/src/local_scheduler/format/local_scheduler.fbs index 67f3077c3..9cb923c30 100644 --- a/src/local_scheduler/format/local_scheduler.fbs +++ b/src/local_scheduler/format/local_scheduler.fbs @@ -1,4 +1,5 @@ // Local scheduler protocol specification +namespace ray.local_scheduler.protocol; enum MessageType:int { // Task is submitted to the local scheduler. This is sent from a worker to a diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 26408957f..6c00893c4 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -528,13 +528,14 @@ void assign_task_to_worker(LocalSchedulerState *state, /* Construct a flatbuffer object to send to the worker. */ flatbuffers::FlatBufferBuilder fbb; - auto message = - CreateGetTaskReply(fbb, fbb.CreateString((char *) spec, task_spec_size), - fbb.CreateVector(worker->gpus_in_use)); + auto message = ray::local_scheduler::protocol::CreateGetTaskReply( + fbb, fbb.CreateString((char *) spec, task_spec_size), + fbb.CreateVector(worker->gpus_in_use)); fbb.Finish(message); - if (write_message(worker->sock, MessageType_ExecuteTask, fbb.GetSize(), - (uint8_t *) fbb.GetBufferPointer()) < 0) { + if (write_message(worker->sock, + ray::local_scheduler::protocol::MessageType_ExecuteTask, + fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) { if (errno == EPIPE || errno == EBADF) { /* Something went wrong, so kill the worker. */ kill_worker(state, worker, false, false); @@ -923,9 +924,10 @@ void reconstruct_object(LocalSchedulerState *state, reconstruct_object_lookup_callback, (void *) state); } -void handle_client_register(LocalSchedulerState *state, - LocalSchedulerClient *worker, - const RegisterClientRequest *message) { +void handle_client_register( + LocalSchedulerState *state, + LocalSchedulerClient *worker, + const ray::local_scheduler::protocol::RegisterClientRequest *message) { /* Make sure this worker hasn't already registered. */ RAY_CHECK(!worker->registered); worker->registered = true; @@ -1029,13 +1031,15 @@ void handle_get_actor_frontier(LocalSchedulerState *state, frontier_vector.push_back(frontier[handle.first]); } flatbuffers::FlatBufferBuilder fbb; - auto reply = CreateActorFrontier( + auto reply = ray::local_scheduler::protocol::CreateActorFrontier( fbb, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, handle_vector), fbb.CreateVector(task_counter_vector), to_flatbuf(fbb, frontier_vector)); fbb.Finish(reply); /* Respond with the built ActorFrontier. */ - if (write_message(worker->sock, MessageType_GetActorFrontierReply, - fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) { + if (write_message( + worker->sock, + ray::local_scheduler::protocol::MessageType_GetActorFrontierReply, + fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) { if (errno == EPIPE || errno == EBADF) { /* Something went wrong, so kill the worker. */ kill_worker(state, worker, false, false); @@ -1047,9 +1051,10 @@ void handle_get_actor_frontier(LocalSchedulerState *state, } } -void handle_set_actor_frontier(LocalSchedulerState *state, - LocalSchedulerClient *worker, - ActorFrontier const &frontier) { +void handle_set_actor_frontier( + LocalSchedulerState *state, + LocalSchedulerClient *worker, + ray::local_scheduler::protocol::ActorFrontier const &frontier) { /* Parse the ActorFrontier flatbuffer. */ ActorID actor_id = from_flatbuf(*frontier.actor_id()); std::unordered_map task_counters; @@ -1082,8 +1087,10 @@ void process_message(event_loop *loop, RAY_LOG(DEBUG) << "New event of type " << type; switch (type) { - case MessageType_SubmitTask: { - auto message = flatbuffers::GetRoot(input); + case ray::local_scheduler::protocol::MessageType_SubmitTask: { + auto message = + flatbuffers::GetRoot( + input); TaskExecutionSpec execution_spec = TaskExecutionSpec(from_flatbuf(*message->execution_dependencies()), (TaskSpec *) message->task_spec()->data(), @@ -1110,9 +1117,9 @@ void process_message(event_loop *loop, execution_spec); } } break; - case MessageType_TaskDone: { + case ray::local_scheduler::protocol::MessageType_TaskDone: { } break; - case MessageType_DisconnectClient: { + case ray::local_scheduler::protocol::MessageType_DisconnectClient: { finish_task(state, worker); RAY_CHECK(!worker->disconnected); worker->disconnected = true; @@ -1122,9 +1129,11 @@ void process_message(event_loop *loop, start_worker(state); } } break; - case MessageType_EventLogMessage: { + case ray::local_scheduler::protocol::MessageType_EventLogMessage: { /* Parse the message. */ - auto message = flatbuffers::GetRoot(input); + auto message = + flatbuffers::GetRoot( + input); if (state->db != NULL) { RayLogger_log_event(state->db, (uint8_t *) message->key()->data(), message->key()->size(), @@ -1132,11 +1141,12 @@ void process_message(event_loop *loop, message->value()->size(), message->timestamp()); } } break; - case MessageType_RegisterClientRequest: { - auto message = flatbuffers::GetRoot(input); + case ray::local_scheduler::protocol::MessageType_RegisterClientRequest: { + auto message = flatbuffers::GetRoot< + ray::local_scheduler::protocol::RegisterClientRequest>(input); handle_client_register(state, worker, message); } break; - case MessageType_GetTask: { + case ray::local_scheduler::protocol::MessageType_GetTask: { /* If this worker reports a completed task, account for resources. */ finish_task(state, worker); /* Let the scheduling algorithm process the fact that there is an available @@ -1147,8 +1157,10 @@ void process_message(event_loop *loop, handle_actor_worker_available(state, state->algorithm_state, worker); } } break; - case MessageType_ReconstructObject: { - auto message = flatbuffers::GetRoot(input); + case ray::local_scheduler::protocol::MessageType_ReconstructObject: { + auto message = + flatbuffers::GetRoot( + input); if (worker->task_in_progress != NULL && !worker->is_blocked) { /* If the worker was executing a task (i.e. non-driver) and it wasn't * already blocked on an object that's not locally available, update its @@ -1178,7 +1190,7 @@ void process_message(event_loop *loop, RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock; handle_client_disconnect(state, worker); } break; - case MessageType_NotifyUnblocked: { + case ray::local_scheduler::protocol::MessageType_NotifyUnblocked: { /* TODO(rkn): A driver may call this as well, right? */ if (worker->task_in_progress != NULL) { /* If the worker was executing a task (i.e. non-driver), update its @@ -1206,18 +1218,22 @@ void process_message(event_loop *loop, } print_worker_info("Worker unblocked", state->algorithm_state); } break; - case MessageType_PutObject: { - auto message = flatbuffers::GetRoot(input); + case ray::local_scheduler::protocol::MessageType_PutObject: { + auto message = + flatbuffers::GetRoot(input); result_table_add(state->db, from_flatbuf(*message->object_id()), from_flatbuf(*message->task_id()), true, NULL, NULL, NULL); } break; - case MessageType_GetActorFrontierRequest: { - auto message = flatbuffers::GetRoot(input); + case ray::local_scheduler::protocol::MessageType_GetActorFrontierRequest: { + auto message = flatbuffers::GetRoot< + ray::local_scheduler::protocol::GetActorFrontierRequest>(input); ActorID actor_id = from_flatbuf(*message->actor_id()); handle_get_actor_frontier(state, worker, actor_id); } break; - case MessageType_SetActorFrontier: { - auto message = flatbuffers::GetRoot(input); + case ray::local_scheduler::protocol::MessageType_SetActorFrontier: { + auto message = + flatbuffers::GetRoot( + input); handle_set_actor_frontier(state, worker, *message); } break; default: diff --git a/src/local_scheduler/local_scheduler_client.cc b/src/local_scheduler/local_scheduler_client.cc index 2d134d9a9..59f4d297f 100644 --- a/src/local_scheduler/local_scheduler_client.cc +++ b/src/local_scheduler/local_scheduler_client.cc @@ -20,12 +20,14 @@ LocalSchedulerConnection *LocalSchedulerConnection_init( * NOTE(swang): If the local scheduler exits and we are registered as a * worker, we will get killed. */ flatbuffers::FlatBufferBuilder fbb; - auto message = CreateRegisterClientRequest( + auto message = ray::local_scheduler::protocol::CreateRegisterClientRequest( fbb, is_worker, to_flatbuf(fbb, client_id), getpid()); fbb.Finish(message); /* Register the process ID with the local scheduler. */ - int success = write_message(result->conn, MessageType_RegisterClientRequest, - fbb.GetSize(), fbb.GetBufferPointer()); + int success = write_message( + result->conn, + ray::local_scheduler::protocol::MessageType_RegisterClientRequest, + fbb.GetSize(), fbb.GetBufferPointer()); RAY_CHECK(success == 0) << "Unable to register worker with local scheduler"; return result; @@ -38,10 +40,11 @@ void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) { void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreateDisconnectClient(fbb); + auto message = ray::local_scheduler::protocol::CreateDisconnectClient(fbb); fbb.Finish(message); - write_message(conn->conn, MessageType_DisconnectClient, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_DisconnectClient, + fbb.GetSize(), fbb.GetBufferPointer()); } void local_scheduler_log_event(LocalSchedulerConnection *conn, @@ -53,11 +56,12 @@ void local_scheduler_log_event(LocalSchedulerConnection *conn, flatbuffers::FlatBufferBuilder fbb; auto key_string = fbb.CreateString((char *) key, key_length); auto value_string = fbb.CreateString((char *) value, value_length); - auto message = - CreateEventLogMessage(fbb, key_string, value_string, timestamp); + auto message = ray::local_scheduler::protocol::CreateEventLogMessage( + fbb, key_string, value_string, timestamp); fbb.Finish(message); - write_message(conn->conn, MessageType_EventLogMessage, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_EventLogMessage, + fbb.GetSize(), fbb.GetBufferPointer()); } void local_scheduler_submit(LocalSchedulerConnection *conn, @@ -68,11 +72,12 @@ void local_scheduler_submit(LocalSchedulerConnection *conn, auto task_spec = fbb.CreateString(reinterpret_cast(execution_spec.Spec()), execution_spec.SpecSize()); - auto message = - CreateSubmitTaskRequest(fbb, execution_dependencies, task_spec); + auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest( + fbb, execution_dependencies, task_spec); fbb.Finish(message); - write_message(conn->conn, MessageType_SubmitTask, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_SubmitTask, + fbb.GetSize(), fbb.GetBufferPointer()); } void local_scheduler_submit_raylet( @@ -81,16 +86,18 @@ void local_scheduler_submit_raylet( ray::raylet::TaskSpecification task_spec) { flatbuffers::FlatBufferBuilder fbb; auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies); - auto message = CreateSubmitTaskRequest(fbb, execution_dependencies_message, - task_spec.ToFlatbuffer(fbb)); + auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest( + fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb)); fbb.Finish(message); - write_message(conn->conn, MessageType_SubmitTask, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_SubmitTask, + fbb.GetSize(), fbb.GetBufferPointer()); } TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, int64_t *task_size) { - write_message(conn->conn, MessageType_GetTask, 0, NULL); + write_message(conn->conn, ray::local_scheduler::protocol::MessageType_GetTask, + 0, NULL); int64_t type; int64_t reply_size; uint8_t *reply; @@ -101,10 +108,11 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection."; exit(1); } - RAY_CHECK(type == MessageType_ExecuteTask); + RAY_CHECK(type == ray::local_scheduler::protocol::MessageType_ExecuteTask); /* Parse the flatbuffer object. */ - auto reply_message = flatbuffers::GetRoot(reply); + auto reply_message = + flatbuffers::GetRoot(reply); /* Create a copy of the task spec so we can free the reply. */ *task_size = reply_message->task_spec()->size(); @@ -128,47 +136,58 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn, } void local_scheduler_task_done(LocalSchedulerConnection *conn) { - write_message(conn->conn, MessageType_TaskDone, 0, NULL); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_TaskDone, 0, NULL); } void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreateReconstructObject(fbb, to_flatbuf(fbb, object_id)); + auto message = ray::local_scheduler::protocol::CreateReconstructObject( + fbb, to_flatbuf(fbb, object_id)); fbb.Finish(message); - write_message(conn->conn, MessageType_ReconstructObject, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_ReconstructObject, + fbb.GetSize(), fbb.GetBufferPointer()); /* TODO(swang): Propagate the error. */ } void local_scheduler_log_message(LocalSchedulerConnection *conn) { - write_message(conn->conn, MessageType_EventLogMessage, 0, NULL); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_EventLogMessage, 0, + NULL); } void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn) { - write_message(conn->conn, MessageType_NotifyUnblocked, 0, NULL); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_NotifyUnblocked, 0, + NULL); } void local_scheduler_put_object(LocalSchedulerConnection *conn, TaskID task_id, ObjectID object_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreatePutObject(fbb, to_flatbuf(fbb, task_id), - to_flatbuf(fbb, object_id)); + auto message = ray::local_scheduler::protocol::CreatePutObject( + fbb, to_flatbuf(fbb, task_id), to_flatbuf(fbb, object_id)); fbb.Finish(message); - write_message(conn->conn, MessageType_PutObject, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_PutObject, + fbb.GetSize(), fbb.GetBufferPointer()); } const std::vector local_scheduler_get_actor_frontier( LocalSchedulerConnection *conn, ActorID actor_id) { flatbuffers::FlatBufferBuilder fbb; - auto message = CreateGetActorFrontierRequest(fbb, to_flatbuf(fbb, actor_id)); + auto message = ray::local_scheduler::protocol::CreateGetActorFrontierRequest( + fbb, to_flatbuf(fbb, actor_id)); fbb.Finish(message); - write_message(conn->conn, MessageType_GetActorFrontierRequest, fbb.GetSize(), - fbb.GetBufferPointer()); + write_message( + conn->conn, + ray::local_scheduler::protocol::MessageType_GetActorFrontierRequest, + fbb.GetSize(), fbb.GetBufferPointer()); int64_t type; std::vector reply; @@ -177,12 +196,14 @@ const std::vector local_scheduler_get_actor_frontier( RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection."; exit(1); } - RAY_CHECK(type == MessageType_GetActorFrontierReply); + RAY_CHECK(type == + ray::local_scheduler::protocol::MessageType_GetActorFrontierReply); return reply; } void local_scheduler_set_actor_frontier(LocalSchedulerConnection *conn, const std::vector &frontier) { - write_message(conn->conn, MessageType_SetActorFrontier, frontier.size(), - const_cast(frontier.data())); + write_message(conn->conn, + ray::local_scheduler::protocol::MessageType_SetActorFrontier, + frontier.size(), const_cast(frontier.data())); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c790b58f2..b7c97860b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -6,28 +6,40 @@ namespace { +namespace local_scheduler_protocol = ray::local_scheduler::protocol; + #define RAY_CHECK_ENUM(x, y) \ static_assert(static_cast(x) == static_cast(y), "protocol mismatch") // Check consistency between client and server protocol. -RAY_CHECK_ENUM(protocol::MessageType_SubmitTask, MessageType_SubmitTask); -RAY_CHECK_ENUM(protocol::MessageType_TaskDone, MessageType_TaskDone); -RAY_CHECK_ENUM(protocol::MessageType_EventLogMessage, MessageType_EventLogMessage); +RAY_CHECK_ENUM(protocol::MessageType_SubmitTask, + local_scheduler_protocol::MessageType_SubmitTask); +RAY_CHECK_ENUM(protocol::MessageType_TaskDone, + local_scheduler_protocol::MessageType_TaskDone); +RAY_CHECK_ENUM(protocol::MessageType_EventLogMessage, + local_scheduler_protocol::MessageType_EventLogMessage); RAY_CHECK_ENUM(protocol::MessageType_RegisterClientRequest, - MessageType_RegisterClientRequest); + local_scheduler_protocol::MessageType_RegisterClientRequest); RAY_CHECK_ENUM(protocol::MessageType_RegisterClientReply, - MessageType_RegisterClientReply); -RAY_CHECK_ENUM(protocol::MessageType_DisconnectClient, MessageType_DisconnectClient); -RAY_CHECK_ENUM(protocol::MessageType_GetTask, MessageType_GetTask); -RAY_CHECK_ENUM(protocol::MessageType_ExecuteTask, MessageType_ExecuteTask); -RAY_CHECK_ENUM(protocol::MessageType_ReconstructObject, MessageType_ReconstructObject); -RAY_CHECK_ENUM(protocol::MessageType_NotifyUnblocked, MessageType_NotifyUnblocked); -RAY_CHECK_ENUM(protocol::MessageType_PutObject, MessageType_PutObject); + local_scheduler_protocol::MessageType_RegisterClientReply); +RAY_CHECK_ENUM(protocol::MessageType_DisconnectClient, + local_scheduler_protocol::MessageType_DisconnectClient); +RAY_CHECK_ENUM(protocol::MessageType_GetTask, + local_scheduler_protocol::MessageType_GetTask); +RAY_CHECK_ENUM(protocol::MessageType_ExecuteTask, + local_scheduler_protocol::MessageType_ExecuteTask); +RAY_CHECK_ENUM(protocol::MessageType_ReconstructObject, + local_scheduler_protocol::MessageType_ReconstructObject); +RAY_CHECK_ENUM(protocol::MessageType_NotifyUnblocked, + local_scheduler_protocol::MessageType_NotifyUnblocked); +RAY_CHECK_ENUM(protocol::MessageType_PutObject, + local_scheduler_protocol::MessageType_PutObject); RAY_CHECK_ENUM(protocol::MessageType_GetActorFrontierRequest, - MessageType_GetActorFrontierRequest); + local_scheduler_protocol::MessageType_GetActorFrontierRequest); RAY_CHECK_ENUM(protocol::MessageType_GetActorFrontierReply, - MessageType_GetActorFrontierReply); -RAY_CHECK_ENUM(protocol::MessageType_SetActorFrontier, MessageType_SetActorFrontier); + local_scheduler_protocol::MessageType_GetActorFrontierReply); +RAY_CHECK_ENUM(protocol::MessageType_SetActorFrontier, + local_scheduler_protocol::MessageType_SetActorFrontier); /// A helper function to determine whether a given actor task has already been executed /// according to the given actor registry. Returns true if the task is a duplicate.