diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..25ecc27d3 --- /dev/null +++ b/Makefile @@ -0,0 +1,86 @@ +SRC_PATH = src +PROTOS_PATH = protos +LIB_PATH = lib/orchlib + +CXX = g++ +CPPFLAGS += -I/usr/local/include -pthread +CXXFLAGS += -std=c++11 -fPIC -I$(SRC_PATH) +LDFLAGS += -L/usr/local/lib -lgrpc++_unsecure -lgrpc -lprotobuf -lpthread -ldl +PROTOC = protoc +GRPC_CPP_PLUGIN = grpc_cpp_plugin +GRPC_CPP_PLUGIN_PATH ?= `which $(GRPC_CPP_PLUGIN)` + +vpath %.proto $(PROTOS_PATH) + +all: system-check $(LIB_PATH)/liborchlib.so $(SRC_PATH)/server + +$(LIB_PATH)/liborchlib.so: $(SRC_PATH)/orchestra.pb.o $(SRC_PATH)/orchestra.grpc.pb.o $(LIB_PATH)/orchlib.o + $(CXX) $^ $(LDFLAGS) -shared -o $@ + +$(SRC_PATH)/server: $(SRC_PATH)/orchestra.pb.o $(SRC_PATH)/orchestra.grpc.pb.o $(SRC_PATH)/server.o + $(CXX) $^ $(LDFLAGS) -o $@ + +.PRECIOUS: ./src/%.grpc.pb.cc +$(SRC_PATH)/%.grpc.pb.cc: %.proto + $(PROTOC) -I $(PROTOS_PATH) --grpc_out=$(SRC_PATH)/ --plugin=protoc-gen-grpc=$(GRPC_CPP_PLUGIN_PATH) $< + +.PRECIOUS: ./src/%.pb.cc +$(SRC_PATH)/%.pb.cc: %.proto + $(PROTOC) -I $(PROTOS_PATH) --cpp_out=./src $< + +clean: + rm -f $(SRC_PATH)/*.o $(LIB_PATH)/*.o $(SRC_PATH)/*.pb.cc $(SRC_PATH)/*.pb.h $(LIB_PATH)/orchlib.so $(SRC_PATH)/server + + +# The following is to test your system and ensure a smoother experience. +# They are by no means necessary to actually compile a grpc-enabled software. + +PROTOC_CMD = which $(PROTOC) +PROTOC_CHECK_CMD = $(PROTOC) --version | grep -q libprotoc.3 +PLUGIN_CHECK_CMD = which $(GRPC_CPP_PLUGIN) +HAS_PROTOC = $(shell $(PROTOC_CMD) > /dev/null && echo true || echo false) +ifeq ($(HAS_PROTOC),true) +HAS_VALID_PROTOC = $(shell $(PROTOC_CHECK_CMD) 2> /dev/null && echo true || echo false) +endif +HAS_PLUGIN = $(shell $(PLUGIN_CHECK_CMD) > /dev/null && echo true || echo false) + +SYSTEM_OK = false +ifeq ($(HAS_VALID_PROTOC),true) +ifeq ($(HAS_PLUGIN),true) +SYSTEM_OK = true +endif +endif + +system-check: +ifneq ($(HAS_VALID_PROTOC),true) + @echo " DEPENDENCY ERROR" + @echo + @echo "You don't have protoc 3.0.0 installed in your path." + @echo "Please install Google protocol buffers 3.0.0 and its compiler." + @echo "You can find it here:" + @echo + @echo " https://github.com/google/protobuf/releases/tag/v3.0.0-alpha-1" + @echo + @echo "Here is what I get when trying to evaluate your version of protoc:" + @echo + -$(PROTOC) --version + @echo + @echo +endif +ifneq ($(HAS_PLUGIN),true) + @echo " DEPENDENCY ERROR" + @echo + @echo "You don't have the grpc c++ protobuf plugin installed in your path." + @echo "Please install grpc. You can find it here:" + @echo + @echo " https://github.com/grpc/grpc" + @echo + @echo "Here is what I get when trying to detect if you have the plugin:" + @echo + -which $(GRPC_CPP_PLUGIN) + @echo + @echo +endif +ifneq ($(SYSTEM_OK),true) + @false +endif diff --git a/lib/orchlib/orchlib.cc b/lib/orchlib/orchlib.cc new file mode 100644 index 000000000..ca87b2755 --- /dev/null +++ b/lib/orchlib/orchlib.cc @@ -0,0 +1,52 @@ +#include +#include +#include + +#include + +#include "orchestra.grpc.pb.h" +#include "orchlib.h" + +using grpc::Channel; +using grpc::ClientContext; +using grpc::Status; + +class Client { + public: + Client(std::shared_ptr channel) + : stub_(Orchestra::NewStub(channel)) {} + + void RemoteCall(const std::string& name) { + RemoteCallRequest request; + request.set_name(name); + + RemoteCallReply reply; + ClientContext context; + + Status status = stub_->RemoteCall(&context, request, &reply); + + return; + } + + private: + std::unique_ptr stub_; +}; + +void* orch_create_context(const char* server_addr) { + Client* client = new Client(grpc::CreateChannel("localhost:50052", grpc::InsecureChannelCredentials())); + return client; +} + +size_t orch_remote_call(void* context, const char* name, void* args) { + Client* client = (Client*)context; + client->RemoteCall(std::string(name)); +} + +int main(int argc, char** argv) { + Client greeter( + grpc::CreateChannel("localhost:50052", grpc::InsecureChannelCredentials())); + std::string user("world"); + greeter.RemoteCall(user); + + return 0; +} diff --git a/lib/orchlib/orchlib.h b/lib/orchlib/orchlib.h new file mode 100644 index 000000000..694c3389c --- /dev/null +++ b/lib/orchlib/orchlib.h @@ -0,0 +1,6 @@ +extern "C" { + +void* orch_create_context(const char* server_addr); +size_t orch_remote_call(void* context, const char* name, void* args); + +} diff --git a/lib/orchpy/orchpy/__init__.py b/lib/orchpy/orchpy/__init__.py new file mode 100644 index 000000000..0cebf2bfd --- /dev/null +++ b/lib/orchpy/orchpy/__init__.py @@ -0,0 +1 @@ +from .context import Context diff --git a/lib/orchpy/orchpy/context.pyx b/lib/orchpy/orchpy/context.pyx new file mode 100644 index 000000000..cf2299dca --- /dev/null +++ b/lib/orchpy/orchpy/context.pyx @@ -0,0 +1,14 @@ +cdef extern void* orch_create_context(const char* server_addr); +cdef extern size_t orch_remote_call(void* context, const char* name, void* args); + +cdef class Context: + cdef void* context + + def __cinit__(self): + self.context = NULL + + def connect(self, server_addr): + self.context = orch_create_context(server_addr) + + def call(self, name): + orch_remote_call(self.context, name, 0) diff --git a/lib/orchpy/setup.py b/lib/orchpy/setup.py new file mode 100644 index 000000000..e0b12601e --- /dev/null +++ b/lib/orchpy/setup.py @@ -0,0 +1,16 @@ +from setuptools import setup, Extension, find_packages +from Cython.Build import cythonize + +# because of relative paths, this must be run from inside orch/lib/orchpy/ + +setup( + name = "orchestra", + version = "0.1.dev0", + ext_modules = cythonize([ + Extension("orchpy/context", + sources = ["orchpy/context.pyx"], libraries=["orchlib"], + library_dirs=['../orchlib/'])], + compiler_directives={'language_level': 3}), + use_2to3=True, + packages=find_packages() +) diff --git a/protos/orchestra.proto b/protos/orchestra.proto new file mode 100644 index 000000000..9e1b49511 --- /dev/null +++ b/protos/orchestra.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +message RegisterWorkerRequest { + string address = 1; +} + +message RegisterWorkerReply { + uint64 workerid = 1; +} + +message Value { + uint64 ref = 1; // for pass by reference + bytes data = 2; // for pass by value +} + +message RemoteCallRequest { + string name = 1; + repeated Value arg = 2; +} + +message RemoteCallReply { + uint64 result = 1; +} + +message PullObjectRequest { + uint64 ref = 1; +} + +service Orchestra { + rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerReply); + // rpc RegisterFunction + rpc RemoteCall(RemoteCallRequest) returns (RemoteCallReply); + // rpc PushObject + // rpc PullObject(PullObjectRequest) + // rpc DeliverRequest +} diff --git a/src/server.cc b/src/server.cc new file mode 100644 index 000000000..2147ac8dc --- /dev/null +++ b/src/server.cc @@ -0,0 +1,51 @@ +#include +#include +#include + +#include + +#include "orchestra.grpc.pb.h" + +using grpc::Server; +using grpc::ServerBuilder; +using grpc::ServerContext; +using grpc::Status; +// using helloworld::HelloRequest; +// using helloworld::HelloReply; +// using helloworld::Greeter; + +// Logic and data behind the server's behavior. +class OrchestraServiceImpl final : public Orchestra::Service { + Status RemoteCall(ServerContext* context, const RemoteCallRequest* request, + RemoteCallReply* reply) override { + std::cout << "called" << std::endl; + // std::string prefix("Hello "); + // reply->set_message(prefix + request->name()); + return Status::OK; + } +}; + +void RunServer() { + std::string server_address("0.0.0.0:50052"); + OrchestraServiceImpl service; + + ServerBuilder builder; + // Listen on the given address without any authentication mechanism. + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + // Register "service" as the instance through which we'll communicate with + // clients. In this case it corresponds to an *synchronous* service. + builder.RegisterService(&service); + // Finally assemble the server. + std::unique_ptr server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + + // Wait for the server to shutdown. Note that some other thread must be + // responsible for shutting down the server for this call to ever return. + server->Wait(); +} + +int main(int argc, char** argv) { + RunServer(); + + return 0; +}