Fix redefinition of flatbuffer types (#2189)

This commit is contained in:
songqing
2018-06-05 15:08:05 +08:00
committed by Robert Nishihara
parent b56c8ed8dc
commit 451cdb43f6
4 changed files with 133 additions and 83 deletions
@@ -1,4 +1,5 @@
// Local scheduler protocol specification
namespace ray.local_scheduler.protocol;
enum MessageType:int {
// Task is submitted to the local scheduler. This is sent from a worker to a
+48 -32
View File
@@ -528,13 +528,14 @@ void assign_task_to_worker(LocalSchedulerState *state,
/* Construct a flatbuffer object to send to the worker. */
flatbuffers::FlatBufferBuilder fbb;
auto message =
CreateGetTaskReply(fbb, fbb.CreateString((char *) spec, task_spec_size),
fbb.CreateVector(worker->gpus_in_use));
auto message = ray::local_scheduler::protocol::CreateGetTaskReply(
fbb, fbb.CreateString((char *) spec, task_spec_size),
fbb.CreateVector(worker->gpus_in_use));
fbb.Finish(message);
if (write_message(worker->sock, MessageType_ExecuteTask, fbb.GetSize(),
(uint8_t *) fbb.GetBufferPointer()) < 0) {
if (write_message(worker->sock,
ray::local_scheduler::protocol::MessageType_ExecuteTask,
fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) {
if (errno == EPIPE || errno == EBADF) {
/* Something went wrong, so kill the worker. */
kill_worker(state, worker, false, false);
@@ -923,9 +924,10 @@ void reconstruct_object(LocalSchedulerState *state,
reconstruct_object_lookup_callback, (void *) state);
}
void handle_client_register(LocalSchedulerState *state,
LocalSchedulerClient *worker,
const RegisterClientRequest *message) {
void handle_client_register(
LocalSchedulerState *state,
LocalSchedulerClient *worker,
const ray::local_scheduler::protocol::RegisterClientRequest *message) {
/* Make sure this worker hasn't already registered. */
RAY_CHECK(!worker->registered);
worker->registered = true;
@@ -1029,13 +1031,15 @@ void handle_get_actor_frontier(LocalSchedulerState *state,
frontier_vector.push_back(frontier[handle.first]);
}
flatbuffers::FlatBufferBuilder fbb;
auto reply = CreateActorFrontier(
auto reply = ray::local_scheduler::protocol::CreateActorFrontier(
fbb, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, handle_vector),
fbb.CreateVector(task_counter_vector), to_flatbuf(fbb, frontier_vector));
fbb.Finish(reply);
/* Respond with the built ActorFrontier. */
if (write_message(worker->sock, MessageType_GetActorFrontierReply,
fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) {
if (write_message(
worker->sock,
ray::local_scheduler::protocol::MessageType_GetActorFrontierReply,
fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) {
if (errno == EPIPE || errno == EBADF) {
/* Something went wrong, so kill the worker. */
kill_worker(state, worker, false, false);
@@ -1047,9 +1051,10 @@ void handle_get_actor_frontier(LocalSchedulerState *state,
}
}
void handle_set_actor_frontier(LocalSchedulerState *state,
LocalSchedulerClient *worker,
ActorFrontier const &frontier) {
void handle_set_actor_frontier(
LocalSchedulerState *state,
LocalSchedulerClient *worker,
ray::local_scheduler::protocol::ActorFrontier const &frontier) {
/* Parse the ActorFrontier flatbuffer. */
ActorID actor_id = from_flatbuf(*frontier.actor_id());
std::unordered_map<ActorID, int64_t> task_counters;
@@ -1082,8 +1087,10 @@ void process_message(event_loop *loop,
RAY_LOG(DEBUG) << "New event of type " << type;
switch (type) {
case MessageType_SubmitTask: {
auto message = flatbuffers::GetRoot<SubmitTaskRequest>(input);
case ray::local_scheduler::protocol::MessageType_SubmitTask: {
auto message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::SubmitTaskRequest>(
input);
TaskExecutionSpec execution_spec =
TaskExecutionSpec(from_flatbuf(*message->execution_dependencies()),
(TaskSpec *) message->task_spec()->data(),
@@ -1110,9 +1117,9 @@ void process_message(event_loop *loop,
execution_spec);
}
} break;
case MessageType_TaskDone: {
case ray::local_scheduler::protocol::MessageType_TaskDone: {
} break;
case MessageType_DisconnectClient: {
case ray::local_scheduler::protocol::MessageType_DisconnectClient: {
finish_task(state, worker);
RAY_CHECK(!worker->disconnected);
worker->disconnected = true;
@@ -1122,9 +1129,11 @@ void process_message(event_loop *loop,
start_worker(state);
}
} break;
case MessageType_EventLogMessage: {
case ray::local_scheduler::protocol::MessageType_EventLogMessage: {
/* Parse the message. */
auto message = flatbuffers::GetRoot<EventLogMessage>(input);
auto message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::EventLogMessage>(
input);
if (state->db != NULL) {
RayLogger_log_event(state->db, (uint8_t *) message->key()->data(),
message->key()->size(),
@@ -1132,11 +1141,12 @@ void process_message(event_loop *loop,
message->value()->size(), message->timestamp());
}
} break;
case MessageType_RegisterClientRequest: {
auto message = flatbuffers::GetRoot<RegisterClientRequest>(input);
case ray::local_scheduler::protocol::MessageType_RegisterClientRequest: {
auto message = flatbuffers::GetRoot<
ray::local_scheduler::protocol::RegisterClientRequest>(input);
handle_client_register(state, worker, message);
} break;
case MessageType_GetTask: {
case ray::local_scheduler::protocol::MessageType_GetTask: {
/* If this worker reports a completed task, account for resources. */
finish_task(state, worker);
/* Let the scheduling algorithm process the fact that there is an available
@@ -1147,8 +1157,10 @@ void process_message(event_loop *loop,
handle_actor_worker_available(state, state->algorithm_state, worker);
}
} break;
case MessageType_ReconstructObject: {
auto message = flatbuffers::GetRoot<ReconstructObject>(input);
case ray::local_scheduler::protocol::MessageType_ReconstructObject: {
auto message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::ReconstructObject>(
input);
if (worker->task_in_progress != NULL && !worker->is_blocked) {
/* If the worker was executing a task (i.e. non-driver) and it wasn't
* already blocked on an object that's not locally available, update its
@@ -1178,7 +1190,7 @@ void process_message(event_loop *loop,
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock;
handle_client_disconnect(state, worker);
} break;
case MessageType_NotifyUnblocked: {
case ray::local_scheduler::protocol::MessageType_NotifyUnblocked: {
/* TODO(rkn): A driver may call this as well, right? */
if (worker->task_in_progress != NULL) {
/* If the worker was executing a task (i.e. non-driver), update its
@@ -1206,18 +1218,22 @@ void process_message(event_loop *loop,
}
print_worker_info("Worker unblocked", state->algorithm_state);
} break;
case MessageType_PutObject: {
auto message = flatbuffers::GetRoot<PutObject>(input);
case ray::local_scheduler::protocol::MessageType_PutObject: {
auto message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::PutObject>(input);
result_table_add(state->db, from_flatbuf(*message->object_id()),
from_flatbuf(*message->task_id()), true, NULL, NULL, NULL);
} break;
case MessageType_GetActorFrontierRequest: {
auto message = flatbuffers::GetRoot<GetActorFrontierRequest>(input);
case ray::local_scheduler::protocol::MessageType_GetActorFrontierRequest: {
auto message = flatbuffers::GetRoot<
ray::local_scheduler::protocol::GetActorFrontierRequest>(input);
ActorID actor_id = from_flatbuf(*message->actor_id());
handle_get_actor_frontier(state, worker, actor_id);
} break;
case MessageType_SetActorFrontier: {
auto message = flatbuffers::GetRoot<ActorFrontier>(input);
case ray::local_scheduler::protocol::MessageType_SetActorFrontier: {
auto message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::ActorFrontier>(
input);
handle_set_actor_frontier(state, worker, *message);
} break;
default:
+58 -37
View File
@@ -20,12 +20,14 @@ LocalSchedulerConnection *LocalSchedulerConnection_init(
* NOTE(swang): If the local scheduler exits and we are registered as a
* worker, we will get killed. */
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateRegisterClientRequest(
auto message = ray::local_scheduler::protocol::CreateRegisterClientRequest(
fbb, is_worker, to_flatbuf(fbb, client_id), getpid());
fbb.Finish(message);
/* Register the process ID with the local scheduler. */
int success = write_message(result->conn, MessageType_RegisterClientRequest,
fbb.GetSize(), fbb.GetBufferPointer());
int success = write_message(
result->conn,
ray::local_scheduler::protocol::MessageType_RegisterClientRequest,
fbb.GetSize(), fbb.GetBufferPointer());
RAY_CHECK(success == 0) << "Unable to register worker with local scheduler";
return result;
@@ -38,10 +40,11 @@ void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) {
void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateDisconnectClient(fbb);
auto message = ray::local_scheduler::protocol::CreateDisconnectClient(fbb);
fbb.Finish(message);
write_message(conn->conn, MessageType_DisconnectClient, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_DisconnectClient,
fbb.GetSize(), fbb.GetBufferPointer());
}
void local_scheduler_log_event(LocalSchedulerConnection *conn,
@@ -53,11 +56,12 @@ void local_scheduler_log_event(LocalSchedulerConnection *conn,
flatbuffers::FlatBufferBuilder fbb;
auto key_string = fbb.CreateString((char *) key, key_length);
auto value_string = fbb.CreateString((char *) value, value_length);
auto message =
CreateEventLogMessage(fbb, key_string, value_string, timestamp);
auto message = ray::local_scheduler::protocol::CreateEventLogMessage(
fbb, key_string, value_string, timestamp);
fbb.Finish(message);
write_message(conn->conn, MessageType_EventLogMessage, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_EventLogMessage,
fbb.GetSize(), fbb.GetBufferPointer());
}
void local_scheduler_submit(LocalSchedulerConnection *conn,
@@ -68,11 +72,12 @@ void local_scheduler_submit(LocalSchedulerConnection *conn,
auto task_spec =
fbb.CreateString(reinterpret_cast<char *>(execution_spec.Spec()),
execution_spec.SpecSize());
auto message =
CreateSubmitTaskRequest(fbb, execution_dependencies, task_spec);
auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest(
fbb, execution_dependencies, task_spec);
fbb.Finish(message);
write_message(conn->conn, MessageType_SubmitTask, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_SubmitTask,
fbb.GetSize(), fbb.GetBufferPointer());
}
void local_scheduler_submit_raylet(
@@ -81,16 +86,18 @@ void local_scheduler_submit_raylet(
ray::raylet::TaskSpecification task_spec) {
flatbuffers::FlatBufferBuilder fbb;
auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies);
auto message = CreateSubmitTaskRequest(fbb, execution_dependencies_message,
task_spec.ToFlatbuffer(fbb));
auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest(
fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb));
fbb.Finish(message);
write_message(conn->conn, MessageType_SubmitTask, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_SubmitTask,
fbb.GetSize(), fbb.GetBufferPointer());
}
TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
int64_t *task_size) {
write_message(conn->conn, MessageType_GetTask, 0, NULL);
write_message(conn->conn, ray::local_scheduler::protocol::MessageType_GetTask,
0, NULL);
int64_t type;
int64_t reply_size;
uint8_t *reply;
@@ -101,10 +108,11 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection.";
exit(1);
}
RAY_CHECK(type == MessageType_ExecuteTask);
RAY_CHECK(type == ray::local_scheduler::protocol::MessageType_ExecuteTask);
/* Parse the flatbuffer object. */
auto reply_message = flatbuffers::GetRoot<GetTaskReply>(reply);
auto reply_message =
flatbuffers::GetRoot<ray::local_scheduler::protocol::GetTaskReply>(reply);
/* Create a copy of the task spec so we can free the reply. */
*task_size = reply_message->task_spec()->size();
@@ -128,47 +136,58 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
}
void local_scheduler_task_done(LocalSchedulerConnection *conn) {
write_message(conn->conn, MessageType_TaskDone, 0, NULL);
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_TaskDone, 0, NULL);
}
void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn,
ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateReconstructObject(fbb, to_flatbuf(fbb, object_id));
auto message = ray::local_scheduler::protocol::CreateReconstructObject(
fbb, to_flatbuf(fbb, object_id));
fbb.Finish(message);
write_message(conn->conn, MessageType_ReconstructObject, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_ReconstructObject,
fbb.GetSize(), fbb.GetBufferPointer());
/* TODO(swang): Propagate the error. */
}
void local_scheduler_log_message(LocalSchedulerConnection *conn) {
write_message(conn->conn, MessageType_EventLogMessage, 0, NULL);
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_EventLogMessage, 0,
NULL);
}
void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn) {
write_message(conn->conn, MessageType_NotifyUnblocked, 0, NULL);
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_NotifyUnblocked, 0,
NULL);
}
void local_scheduler_put_object(LocalSchedulerConnection *conn,
TaskID task_id,
ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreatePutObject(fbb, to_flatbuf(fbb, task_id),
to_flatbuf(fbb, object_id));
auto message = ray::local_scheduler::protocol::CreatePutObject(
fbb, to_flatbuf(fbb, task_id), to_flatbuf(fbb, object_id));
fbb.Finish(message);
write_message(conn->conn, MessageType_PutObject, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_PutObject,
fbb.GetSize(), fbb.GetBufferPointer());
}
const std::vector<uint8_t> local_scheduler_get_actor_frontier(
LocalSchedulerConnection *conn,
ActorID actor_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateGetActorFrontierRequest(fbb, to_flatbuf(fbb, actor_id));
auto message = ray::local_scheduler::protocol::CreateGetActorFrontierRequest(
fbb, to_flatbuf(fbb, actor_id));
fbb.Finish(message);
write_message(conn->conn, MessageType_GetActorFrontierRequest, fbb.GetSize(),
fbb.GetBufferPointer());
write_message(
conn->conn,
ray::local_scheduler::protocol::MessageType_GetActorFrontierRequest,
fbb.GetSize(), fbb.GetBufferPointer());
int64_t type;
std::vector<uint8_t> reply;
@@ -177,12 +196,14 @@ const std::vector<uint8_t> local_scheduler_get_actor_frontier(
RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection.";
exit(1);
}
RAY_CHECK(type == MessageType_GetActorFrontierReply);
RAY_CHECK(type ==
ray::local_scheduler::protocol::MessageType_GetActorFrontierReply);
return reply;
}
void local_scheduler_set_actor_frontier(LocalSchedulerConnection *conn,
const std::vector<uint8_t> &frontier) {
write_message(conn->conn, MessageType_SetActorFrontier, frontier.size(),
const_cast<uint8_t *>(frontier.data()));
write_message(conn->conn,
ray::local_scheduler::protocol::MessageType_SetActorFrontier,
frontier.size(), const_cast<uint8_t *>(frontier.data()));
}
+26 -14
View File
@@ -6,28 +6,40 @@
namespace {
namespace local_scheduler_protocol = ray::local_scheduler::protocol;
#define RAY_CHECK_ENUM(x, y) \
static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
// Check consistency between client and server protocol.
RAY_CHECK_ENUM(protocol::MessageType_SubmitTask, MessageType_SubmitTask);
RAY_CHECK_ENUM(protocol::MessageType_TaskDone, MessageType_TaskDone);
RAY_CHECK_ENUM(protocol::MessageType_EventLogMessage, MessageType_EventLogMessage);
RAY_CHECK_ENUM(protocol::MessageType_SubmitTask,
local_scheduler_protocol::MessageType_SubmitTask);
RAY_CHECK_ENUM(protocol::MessageType_TaskDone,
local_scheduler_protocol::MessageType_TaskDone);
RAY_CHECK_ENUM(protocol::MessageType_EventLogMessage,
local_scheduler_protocol::MessageType_EventLogMessage);
RAY_CHECK_ENUM(protocol::MessageType_RegisterClientRequest,
MessageType_RegisterClientRequest);
local_scheduler_protocol::MessageType_RegisterClientRequest);
RAY_CHECK_ENUM(protocol::MessageType_RegisterClientReply,
MessageType_RegisterClientReply);
RAY_CHECK_ENUM(protocol::MessageType_DisconnectClient, MessageType_DisconnectClient);
RAY_CHECK_ENUM(protocol::MessageType_GetTask, MessageType_GetTask);
RAY_CHECK_ENUM(protocol::MessageType_ExecuteTask, MessageType_ExecuteTask);
RAY_CHECK_ENUM(protocol::MessageType_ReconstructObject, MessageType_ReconstructObject);
RAY_CHECK_ENUM(protocol::MessageType_NotifyUnblocked, MessageType_NotifyUnblocked);
RAY_CHECK_ENUM(protocol::MessageType_PutObject, MessageType_PutObject);
local_scheduler_protocol::MessageType_RegisterClientReply);
RAY_CHECK_ENUM(protocol::MessageType_DisconnectClient,
local_scheduler_protocol::MessageType_DisconnectClient);
RAY_CHECK_ENUM(protocol::MessageType_GetTask,
local_scheduler_protocol::MessageType_GetTask);
RAY_CHECK_ENUM(protocol::MessageType_ExecuteTask,
local_scheduler_protocol::MessageType_ExecuteTask);
RAY_CHECK_ENUM(protocol::MessageType_ReconstructObject,
local_scheduler_protocol::MessageType_ReconstructObject);
RAY_CHECK_ENUM(protocol::MessageType_NotifyUnblocked,
local_scheduler_protocol::MessageType_NotifyUnblocked);
RAY_CHECK_ENUM(protocol::MessageType_PutObject,
local_scheduler_protocol::MessageType_PutObject);
RAY_CHECK_ENUM(protocol::MessageType_GetActorFrontierRequest,
MessageType_GetActorFrontierRequest);
local_scheduler_protocol::MessageType_GetActorFrontierRequest);
RAY_CHECK_ENUM(protocol::MessageType_GetActorFrontierReply,
MessageType_GetActorFrontierReply);
RAY_CHECK_ENUM(protocol::MessageType_SetActorFrontier, MessageType_SetActorFrontier);
local_scheduler_protocol::MessageType_GetActorFrontierReply);
RAY_CHECK_ENUM(protocol::MessageType_SetActorFrontier,
local_scheduler_protocol::MessageType_SetActorFrontier);
/// A helper function to determine whether a given actor task has already been executed
/// according to the given actor registry. Returns true if the task is a duplicate.