From 4f5a637a8f0b1839f49bb0e6d034ba955301a474 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 25 Mar 2016 13:46:12 -0700 Subject: [PATCH] document and clean up orchestra protocol buffer specification --- protos/orchestra.proto | 242 ++++++++++++++++++++++------------------- protos/types.proto | 10 +- src/objstore.cc | 2 +- src/scheduler.cc | 7 +- 4 files changed, 139 insertions(+), 122 deletions(-) diff --git a/protos/orchestra.proto b/protos/orchestra.proto index 638a7c9a5..35cb79a45 100644 --- a/protos/orchestra.proto +++ b/protos/orchestra.proto @@ -1,82 +1,20 @@ +// This file defines the GRPC interface between scheduler, object stores and +// workers. These are used for communication over the network. + +// Terminology: +// Worker: A cluster consists of multiple worker processes (typically one +// per core) which execute tasks that can access objects from object stores. +// Object store: Typically there is one object store per node which holds the +// objects locally stored on that node. +// Scheduler: The scheduler process keeps track of a mapping from object +// references to object stores, orchestrates data transfer between object +// stores and assigns tasks to workers. + syntax = "proto3"; import "types.proto"; -message AckReply { - string errormsg = 1; -} - -message RegisterWorkerRequest { - string worker_address = 1; - string objstore_address = 2; -} - -message RegisterWorkerReply { - uint64 workerid = 1; -} - -message RegisterObjStoreRequest { - string address = 1; -} - -message RegisterObjStoreReply { - uint64 objstoreid = 1; -} - -message RegisterFunctionRequest { - uint64 workerid = 1; - string fnname = 2; - uint64 num_return_vals = 3; -} - -message RemoteCallRequest { - Call call = 1; -} - -message RemoteCallReply { - repeated uint64 result = 1; -} - -message PullObjRequest { - uint64 workerid = 1; - uint64 objref = 2; -} - -message PushObjRequest { - uint64 workerid = 1; -} - -message PushObjReply { - uint64 objref = 1; -} - -message ObjReadyRequest { - uint64 objref = 1; - uint64 objstoreid = 2; -} - -message WorkerReadyRequest { - uint64 workerid = 1; -} - -message ChangeCountRequest { - uint64 objref = 1; -} - -message SchedulerDebugInfoRequest { - bool do_scheduling = 1; -} - -message FnTableEntry { - repeated uint64 workerid = 1; - uint64 num_return_vals = 2; -} - -message SchedulerDebugInfoReply { - repeated Call task = 1; - repeated uint64 avail_worker = 3; - map function_table = 2; -} +// Scheduler service Scheduler { // Register a new worker with the scheduler @@ -87,77 +25,159 @@ service Scheduler { rpc RegisterFunction(RegisterFunctionRequest) returns (AckReply); // Asks the scheduler to execute a task, immediately returns an object reference to the result rpc RemoteCall(RemoteCallRequest) returns (RemoteCallReply); - // increment the count of the object reference + // Increment the count of the object reference rpc IncrementCount(ChangeCountRequest) returns (AckReply); - // decrement the count of the object reference + // Decrement the count of the object reference rpc DecrementCount(ChangeCountRequest) returns (AckReply); - // request an object reference for an object that will be pushed to an object store + // Request an object reference for an object that will be pushed to an object store rpc PushObj(PushObjRequest) returns (PushObjReply); - // request delivery of an object + // Request delivery of an object from an object store that holds the object to the local object store rpc PullObj(PullObjRequest) returns (AckReply); - // used by an object store to tell the scheduler that an object is ready + // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared) rpc ObjReady(ObjReadyRequest) returns (AckReply); - // used by the worker to report back and ask for more work + // Used by the worker to report back and ask for more work rpc WorkerReady(WorkerReadyRequest) returns (AckReply); - // get debugging information from the scheduler + // Get debugging information from the scheduler rpc SchedulerDebugInfo(SchedulerDebugInfoRequest) returns (SchedulerDebugInfoReply); } -message DeliverObjRequest { - string objstore_address = 1; // objstore to deliver the object to - uint64 objref = 2; // reference of object that gets delivered +message AckReply { } -message RegisterObjRequest { - uint64 objref = 1; // reference of object that gets registered +message RegisterWorkerRequest { + string worker_address = 1; // IP address of the worker being registered + string objstore_address = 2; // IP address of the object store the worker is connected to } -message RegisterObjReply { - uint64 handle = 1; // handle to memory segment where object is stored +message RegisterWorkerReply { + uint64 workerid = 1; // Worker ID assigned by the scheduler } -message ObjChunk { - uint64 objref = 1; - uint64 totalsize = 2; - bytes data = 3; +message RegisterObjStoreRequest { + string objstore_address = 1; // IP address of the object store being registered } -message GetObjRequest { - uint64 objref = 1; +message RegisterObjStoreReply { + uint64 objstoreid = 1; // Object store ID assigned by the scheduler } -message GetObjReply { - string bucket = 1; - uint64 handle = 2; - uint64 size = 3; +message RegisterFunctionRequest { + uint64 workerid = 1; // Worker that can execute the function + string fnname = 2; // Name of the function that is registered + uint64 num_return_vals = 3; // Number of return values of the function } -message ObjStoreDebugInfoRequest { - repeated uint64 objref = 1; // get protocol buffer objects corresponding to objref +message RemoteCallRequest { + Call call = 1; // Contains name of the function to be executed and arguments } -message ObjStoreDebugInfoReply { - repeated uint64 objref = 1; // list of object references in the store - repeated Obj obj = 2; // protocol buffer objects that were requested +message RemoteCallReply { + repeated uint64 result = 1; // Object references of the function return values } +message PullObjRequest { + uint64 workerid = 1; // Worker that tries to pull the object + uint64 objref = 2; // Object reference of the object being pulled +} + +message PushObjRequest { + uint64 workerid = 1; // Worker that tries to push an object +} + +message PushObjReply { + uint64 objref = 1; // Object reference assigned by the scheduler to the object +} + +message ObjReadyRequest { + uint64 objref = 1; // Object reference of the object that has been finalized + uint64 objstoreid = 2; // ID of the object store the object lives on +} + +message WorkerReadyRequest { + uint64 workerid = 1; // ID of the worker which is ready +} + +message ChangeCountRequest { + uint64 objref = 1; // Object reference of the object whose reference count is increased or decreased +} + +// The following messages are used for debugging purposes: + +message SchedulerDebugInfoRequest { +} + +message FnTableEntry { + repeated uint64 workerid = 1; // ID of the worker that can execute the function + uint64 num_return_vals = 2; // Number of return values of the function +} + +message SchedulerDebugInfoReply { + repeated Call task = 1; // Tasks on the task queue + repeated uint64 avail_worker = 3; // List of workers waiting to get a task assigned + map function_table = 2; // Table of all available remote function +} + +// Object stores + service ObjStore { - // Request to deliver the data that comes with an object reference to another object store + // Request to deliver an object to another object store (called by the scheduler) rpc DeliverObj(DeliverObjRequest) returns (AckReply); - // Accept incoming data from another object store + // Accept incoming data from another object store, as a stream of object chunks rpc StreamObj(stream ObjChunk) returns (AckReply); // Get debug info from the object store rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply); } +message DeliverObjRequest { + string objstore_address = 1; // Object store to deliver the object to + uint64 objref = 2; // Reference of object that gets delivered +} + +message RegisterObjRequest { + uint64 objref = 1; // Reference of object that gets registered +} + +message RegisterObjReply { + uint64 handle = 1; // Handle to memory segment where object is stored +} + +message ObjChunk { + uint64 objref = 1; // Object reference of the object being streamed + uint64 totalsize = 2; // Total size of the object + bytes data = 3; // Data for this chunk of the object +} + +message GetObjRequest { + uint64 objref = 1; // Object reference of the object being requested by the worker +} + +message GetObjReply { + string bucket = 1; // Name of the shared memory segment where the object is stored + uint64 handle = 2; // Shared memory pointer to the object + uint64 size = 3; // Total size of the object in bytes +} + +// These messages are for debugging purposes: + +message ObjStoreDebugInfoRequest { + repeated uint64 objref = 1; // Object references we want to retrieve from the store for inspection +} + +message ObjStoreDebugInfoReply { + repeated uint64 objref = 1; // List of object references in the store + repeated Obj obj = 2; // Protocol buffer objects that were requested +} + +// Workers + +service WorkerService { + rpc InvokeCall(InvokeCallRequest) returns (InvokeCallReply); // Scheduler calls a function from the worker +} + message InvokeCallRequest { - Call call = 1; + Call call = 1; // Contains name of the function to be executed and arguments } message InvokeCallReply { } - -service WorkerService { - rpc InvokeCall(InvokeCallRequest) returns (InvokeCallReply); -} diff --git a/protos/types.proto b/protos/types.proto index e2558cbe9..0eb487b1e 100644 --- a/protos/types.proto +++ b/protos/types.proto @@ -46,14 +46,14 @@ message Dict { } message Value { - uint64 ref = 1; // for pass by reference - Obj obj = 2; // for pass by value + uint64 ref = 1; // For pass by reference + Obj obj = 2; // For pass by value } message Call { - string name = 1; - repeated Value arg = 2; - repeated uint64 result = 3; // object references for result + string name = 1; // Name of the function call + repeated Value arg = 2; // List of arguments, can be either object references or protobuf descriptions of object passed by value + repeated uint64 result = 3; // Object references for result } message Array { diff --git a/src/objstore.cc b/src/objstore.cc index 635c93278..429fa45cb 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -27,7 +27,7 @@ ObjStoreService::ObjStoreService(const std::string& objstore_address, std::share recv_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), true); ClientContext context; RegisterObjStoreRequest request; - request.set_address(objstore_address); + request.set_objstore_address(objstore_address); RegisterObjStoreReply reply; scheduler_stub_->RegisterObjStore(&context, request, &reply); objstoreid_ = reply.objstoreid(); diff --git a/src/scheduler.cc b/src/scheduler.cc index 9b0d1b96d..dcc20e2b8 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -59,9 +59,9 @@ Status SchedulerService::PullObj(ServerContext* context, const PullObjRequest* r Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) { std::lock_guard objstore_lock(objstores_lock_); ObjStoreId objstoreid = objstores_.size(); - auto channel = grpc::CreateChannel(request->address(), grpc::InsecureChannelCredentials()); + auto channel = grpc::CreateChannel(request->objstore_address(), grpc::InsecureChannelCredentials()); objstores_.push_back(ObjStoreHandle()); - objstores_[objstoreid].address = request->address(); + objstores_[objstoreid].address = request->objstore_address(); objstores_[objstoreid].channel = channel; objstores_[objstoreid].objstore_stub = ObjStore::NewStub(channel); reply->set_objstoreid(objstoreid); @@ -263,9 +263,6 @@ void SchedulerService::register_function(const std::string& name, WorkerId worke } void SchedulerService::debug_info(const SchedulerDebugInfoRequest& request, SchedulerDebugInfoReply* reply) { - if (request.do_scheduling()) { - schedule(); - } fntable_lock_.lock(); auto function_table = reply->mutable_function_table(); for (const auto& entry : fntable_) {