From 6ed641177dd71015548dbddad076fbea0e616109 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 26 Oct 2016 23:24:40 -0700 Subject: [PATCH] Remove unnecessary files. (#4) --- include/ray/logging.h | 68 --------- include/ray/ray.h | 46 ------ protos/graph.proto | 42 ------ protos/ray.proto | 324 ---------------------------------------- protos/types.proto | 118 --------------- scripts/gen-protobuf.sh | 7 - 6 files changed, 605 deletions(-) delete mode 100644 include/ray/logging.h delete mode 100644 include/ray/ray.h delete mode 100644 protos/graph.proto delete mode 100644 protos/ray.proto delete mode 100644 protos/types.proto delete mode 100755 scripts/gen-protobuf.sh diff --git a/include/ray/logging.h b/include/ray/logging.h deleted file mode 100644 index f4934075e..000000000 --- a/include/ray/logging.h +++ /dev/null @@ -1,68 +0,0 @@ -#include -#include -#include - -#include - -struct RayConfig { - bool log_to_file = false; - std::ofstream logfile; -}; - -extern RayConfig global_ray_config; - -#define RAY_VERBOSE -1 -#define RAY_INFO 0 -#define RAY_DEBUG 1 -#define RAY_FATAL 2 -#define RAY_REFCOUNT RAY_VERBOSE -#define RAY_ALIAS RAY_VERBOSE - -#ifdef _MSC_VER -extern "C" __declspec(dllimport) int __stdcall IsDebuggerPresent(); -#define RAY_BREAK_IF_DEBUGGING() IsDebuggerPresent() && (__debugbreak(), 1) -#else -#define RAY_BREAK_IF_DEBUGGING() -#endif - -#define RAY_LOG(LEVEL, MESSAGE) \ - if (LEVEL == RAY_VERBOSE) { \ - \ - } else if (LEVEL == RAY_FATAL) { \ - std::cerr << "fatal error occured: " << MESSAGE << std::endl; \ - if (global_ray_config.log_to_file) { \ - global_ray_config.logfile << "fatal error occured: " << MESSAGE << std::endl; \ - } \ - RAY_BREAK_IF_DEBUGGING(); \ - std::exit(1); \ - } else if (LEVEL == RAY_DEBUG) { \ - \ - } else { \ - if (global_ray_config.log_to_file) { \ - global_ray_config.logfile << MESSAGE << std::endl; \ - } else { \ - std::cout << MESSAGE << std::endl; \ - } \ - } - -#define RAY_CHECK(condition, message) \ - if (!(condition)) {\ - RAY_LOG(RAY_FATAL, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \ - } -#define RAY_WARN(condition, message) \ - if (!(condition)) {\ - RAY_LOG(RAY_INFO, "Check failed at line " << __LINE__ << " in " << __FILE__ << ": " << #condition << " with message " << message) \ - } - -#define RAY_CHECK_EQ(var1, var2, message) RAY_CHECK((var1) == (var2), message) -#define RAY_CHECK_NEQ(var1, var2, message) RAY_CHECK((var1) != (var2), message) -#define RAY_CHECK_LE(var1, var2, message) RAY_CHECK((var1) <= (var2), message) -#define RAY_CHECK_LT(var1, var2, message) RAY_CHECK((var1) < (var2), message) -#define RAY_CHECK_GE(var1, var2, message) RAY_CHECK((var1) >= (var2), message) -#define RAY_CHECK_GT(var1, var2, message) RAY_CHECK((var1) > (var2), message) - -#define RAY_CHECK_GRPC(expr) \ - do { \ - grpc::Status _s = (expr); \ - RAY_WARN(_s.ok(), "grpc call failed with message " << _s.error_message()); \ - } while (0); diff --git a/include/ray/ray.h b/include/ray/ray.h deleted file mode 100644 index 06fd0cf63..000000000 --- a/include/ray/ray.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef RAY_INCLUDE_RAY_H -#define RAY_INCLUDE_RAY_H - -#include -#include -#include -#include "logging.h" - -typedef size_t ObjectID; -typedef size_t WorkerId; -typedef size_t ObjStoreId; -typedef size_t OperationId; -typedef size_t SegmentId; // index into a memory segment table - -class FnInfo { - size_t num_return_vals_; - std::vector workers_; // `workers_` is a sorted vector -public: - void set_num_return_vals(size_t num) { - num_return_vals_ = num; - } - size_t num_return_vals() const { - return num_return_vals_; - } - void add_worker(WorkerId workerid) { - // insert `workerid` into `workers_` so that `workers_` stays sorted - workers_.insert(std::lower_bound(workers_.begin(), workers_.end(), workerid), workerid); - } - size_t num_workers() const { - return workers_.size(); - } - const std::vector& workers() const { - return workers_; - } -}; - -typedef std::vector > ObjTable; -typedef std::unordered_map FnTable; - -class objstore_not_registered_error : public std::runtime_error -{ -public: - objstore_not_registered_error(const std::string& msg) : std::runtime_error(msg) {} -}; - -#endif diff --git a/protos/graph.proto b/protos/graph.proto deleted file mode 100644 index 00ff96266..000000000 --- a/protos/graph.proto +++ /dev/null @@ -1,42 +0,0 @@ -syntax = "proto3"; - -message Arg { - uint64 objectid = 1; // The objectid for the argument. - string serialized_arg = 2; // A serialized representation of an argument passed by value. -} - -message Task { - string name = 1; // Name of the function call. Must not be empty. - repeated Arg arg = 2; // List of object IDs of the arguments to the function. - repeated uint64 result = 3; // Object IDs for result -} - -message Put { - uint64 objectid = 1; // The objectid for the object that was put -} - -message Get { - uint64 objectid = 1; // The objectid for the object that is retrieved -} - -// This is used internally by the scheduler. From the scheduler's perspective, -// the submission of tasks (via SubmitTask) and the submission of puts (via -// PutObj) look very similar, and so it is useful to be able to handle them -// together (for example in the computation graph). -message Operation { - Task task = 1; - Put put = 2; - Get get = 4; - uint64 creator_operationid = 3; // The id of the task that called this task or put. -} - -message TaskStatus { - uint64 operationid = 1; - string function_name = 2; - string worker_address = 3; - string error_message = 4; -} - -message CompGraph { - repeated Operation operation = 1; -} diff --git a/protos/ray.proto b/protos/ray.proto deleted file mode 100644 index 9a2c22b41..000000000 --- a/protos/ray.proto +++ /dev/null @@ -1,324 +0,0 @@ -// This file defines the GRPC interface between scheduler, object stores and -// workers. These are used for communication over the network. - -// Terminology: -// Worker: A cluster consists of multiple worker processes (typically one -// per core) which execute tasks that can access objects from object stores. -// Object store: Typically there is one object store per node which holds the -// objects locally stored on that node. -// Scheduler: The scheduler process keeps track of a mapping from object -// IDs to object stores, orchestrates data transfer between object -// stores and assigns tasks to workers. - -syntax = "proto3"; - -import "graph.proto"; -import "types.proto"; - -// Scheduler - -service Scheduler { - // Register a new worker with the scheduler - rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerReply); - // Register an object store with the scheduler - rpc RegisterObjStore(RegisterObjStoreRequest) returns (RegisterObjStoreReply); - // Tell the scheduler that a worker successfully imported a remote function. - rpc RegisterRemoteFunction(RegisterRemoteFunctionRequest) returns (AckReply); - // Asks the scheduler to execute a task, immediately returns an object ID to the result - rpc SubmitTask(SubmitTaskRequest) returns (SubmitTaskReply); - // Increment the count of the object ID - rpc IncrementCount(ChangeCountRequest) returns (AckReply); - // Decrement the count of the object ID - rpc DecrementCount(ChangeCountRequest) returns (AckReply); - // Request an object ID for an object that will be put in an object store - rpc PutObj(PutObjRequest) returns (PutObjReply); - // Request delivery of an object from an object store that holds the object to the local object store - rpc RequestObj(RequestObjRequest) returns (AckReply); - // Used by the worker to tell the scheduler that two objectids should refer to the same object - rpc AliasObjectIDs(AliasObjectIDsRequest) returns (AckReply); - // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared) - rpc ObjReady(ObjReadyRequest) returns (AckReply); - // Increments the reference count of a particular object ID - rpc IncrementRefCount(IncrementRefCountRequest) returns (AckReply); - // Decrements the reference count of a particular object ID - rpc DecrementRefCount(DecrementRefCountRequest) returns (AckReply); - // Used by the worker to notify the scheduler about which objectids a particular object contains - rpc AddContainedObjectIDs(AddContainedObjectIDsRequest) returns (AckReply); - // Used by the worker to ask for work, this also returns the status of the previous task if there was one - rpc ReadyForNewTask(ReadyForNewTaskRequest) returns (AckReply); - // Get information about the scheduler state - rpc SchedulerInfo(SchedulerInfoRequest) returns (SchedulerInfoReply); - // Get information about tasks - rpc TaskInfo(TaskInfoRequest) returns (TaskInfoReply); - // Kills the workers - rpc KillWorkers(KillWorkersRequest) returns (KillWorkersReply); - // Run a function on all workers - rpc RunFunctionOnAllWorkers(RunFunctionOnAllWorkersRequest) returns (AckReply); - // Exports function to the workers - rpc ExportRemoteFunction(ExportRemoteFunctionRequest) returns (AckReply); - // Ship an initializer and reinitializer for a reusable variable to the workers - rpc ExportReusableVariable(ExportReusableVariableRequest) returns (AckReply); - // Notify the scheduler that a failure occurred while running a task, importing a remote function, or importing a reusable variable. - rpc NotifyFailure(NotifyFailureRequest) returns (AckReply); - // Polls the scheduler to see what objectids can be retrieved in the input list. - rpc Wait(WaitRequest) returns (WaitReply); -} - -message AckReply { -} - -message RegisterWorkerRequest { - string node_ip_address = 1; // The IP address of the node the worker is running on. - string worker_address = 2; // The address of the worker. - string objstore_address = 3; // The address of the object store the worker should connect to. If omitted, this will be assigned by the scheduler. - bool is_driver = 4; // True if the worker is a driver, and false otherwise. -} - -message RegisterWorkerReply { - uint64 workerid = 1; // Worker ID assigned by the scheduler - uint64 objstoreid = 2; // The Object store ID of the worker's local object store - string objstore_address = 3; // IP address of the object store the worker should connect to -} - -message RegisterObjStoreRequest { - string objstore_address = 1; // IP address of the object store being registered -} - -message RegisterObjStoreReply { - uint64 objstoreid = 1; // Object store ID assigned by the scheduler -} - -message RegisterRemoteFunctionRequest { - uint64 workerid = 1; // Worker that can execute the function - string function_name = 2; // Name of the remote function - uint64 num_return_vals = 3; // Number of return values of the function. This is only present if the function was successfully imported. -} - -message NotifyFailure { - Failure failure = 1; // The failure object. -} - -message SubmitTaskRequest { - uint64 workerid = 1; // The ID of the worker submitting the task - Task task = 2; // Contains name of the function to be executed and arguments -} - -message SubmitTaskReply { - repeated uint64 result = 1; // Object IDs of the function return values - bool function_registered = 2; // True if the function was registered, false otherwise - bool no_workers = 3; // True if no workers have registered with the scheduler, false otherwise -} - -message RequestObjRequest { - uint64 workerid = 1; // Worker that tries to request the object - uint64 objectid = 2; // Object ID of the object being requested -} - -message PutObjRequest { - uint64 workerid = 1; // Worker that tries to put an object -} - -message PutObjReply { - uint64 objectid = 1; // Object ID assigned by the scheduler to the object -} - -message AliasObjectIDsRequest { - uint64 alias_objectid = 1; // ObjectID which will be aliased - uint64 target_objectid = 2; // The target ObjectID -} - -message ObjReadyRequest { - uint64 objectid = 1; // Object ID of the object that has been finalized - uint64 objstoreid = 2; // ID of the object store the object lives on -} - -message IncrementRefCountRequest { - repeated uint64 objectid = 1; // Object IDs whose reference count should be incremented. Duplicates will be incremented multiple times. -} - -message AddContainedObjectIDsRequest { - uint64 objectid = 1; // The objectid of the object in question - repeated uint64 contained_objectid = 2; // Object IDs contained in the object -} - -message DecrementRefCountRequest { - repeated uint64 objectid = 1; // Object IDs whose reference count should be decremented. Duplicates will be decremented multiple times. -} - -message ReadyForNewTaskRequest { - uint64 workerid = 1; // ID of the worker which executed the task -} - -message ChangeCountRequest { - uint64 objectid = 1; // Object ID of the object whose reference count is increased or decreased -} - -// The following messages are used to get information about the scheduler state - -message SchedulerInfoRequest { -} - -message FnTableEntry { - repeated uint64 workerid = 1; // ID of the worker that can execute the function - uint64 num_return_vals = 2; // Number of return values of the function -} - -message SchedulerInfoReply { - repeated uint64 operationid = 1; // OperationIds of the tasks on the task queue - repeated uint64 avail_worker = 3; // List of workers waiting to get a task assigned - map function_table = 2; // Table of all available remote function - repeated uint64 target_objectid = 4; // The target_objectids_ data structure - repeated uint64 reference_count = 5; // The reference_counts_ data structure - CompGraph computation_graph = 6; // The computation graph constructed so far - repeated ObjstoreData objstore = 7; // Information about the object stores -} - -message WaitRequest { - repeated uint64 objectids = 1; // List of objectids to be checked. -} - -message WaitReply { - repeated uint64 indices = 1; // List of indices that correspond to objectids in the original list that are ready. -} - -// Object stores - -service ObjStore { - // Tell the object store to begin getting an object from another object store (called by the scheduler) - rpc StartDelivery(StartDeliveryRequest) returns (AckReply); - // Accept incoming data from another object store, as a stream of object chunks - rpc StreamObjTo(StreamObjToRequest) returns (stream ObjChunk); - // Notify the object store about objectid aliasing. This is called by the scheduler - rpc NotifyAlias(NotifyAliasRequest) returns (AckReply); - // Tell the object store to deallocate an object held by the object store. This is called by the scheduler. - rpc DeallocateObject(DeallocateObjectRequest) returns (AckReply); - // Get info about the object store state - rpc ObjStoreInfo(ObjStoreInfoRequest) returns (ObjStoreInfoReply); -} - -message StartDeliveryRequest { - string objstore_address = 1; // Object store to get the object from - uint64 objectid = 2; // ID of object that gets delivered -} - -message RegisterObjRequest { - uint64 objectid = 1; // ID of object that gets registered -} - -message RegisterObjReply { - uint64 handle = 1; // Handle to memory segment where object is stored -} - -message StreamObjToRequest { - uint64 objectid = 1; // Object ID of the object being streamed -} - -message ObjChunk { - uint64 total_size = 1; // Total size of the object - uint64 metadata_offset = 2; // Offset of the arrow metadata - bytes data = 3; // Data for this chunk of the object -} - -message NotifyAliasRequest { - uint64 alias_objectid = 1; // The objectid being aliased - uint64 canonical_objectid = 2; // The canonical objectid that points to the actual object -} - -message DeallocateObjectRequest { - uint64 canonical_objectid = 1; // The canonical objectid of the object to deallocate -} - -message GetObjRequest { - uint64 objectid = 1; // Object ID of the object being requested by the worker -} - -message TaskInfoRequest { -} - -message TaskInfoReply { - repeated TaskStatus failed_task = 1; // The tasks that have failed. - repeated TaskStatus running_task = 2; // The tasks that are currently running. - repeated Failure failed_remote_function_import = 3; // The remote function imports that failed. - repeated Failure failed_reusable_variable_import = 4; // The reusable variable imports that failed. - repeated Failure failed_reinitialize_reusable_variable = 5; // The reusable variable reinitializations that failed. - repeated Failure failed_function_to_run = 6; // The function to run on all workers that failed. -} - -message KillWorkersRequest { -} - -message KillWorkersReply { - bool success = 1; // Currently, the only reason to fail is if there are workers still executing tasks -} - -message RunFunctionOnAllWorkersRequest { - Function function = 1; -} - -message ExportRemoteFunctionRequest { - Function function = 1; -} - -message ExportReusableVariableRequest { - ReusableVar reusable_variable = 1; // The reusable variable to export. -} - -message NotifyFailureRequest { - Failure failure = 1; // The failure object. -} - -// These messages are for getting information about the object store state - -message ObjStoreInfoRequest { - repeated uint64 objectid = 1; // Object IDs we want to retrieve from the store for inspection -} - -message ObjStoreInfoReply { - repeated uint64 objectid = 1; // List of object IDs in the store - repeated Obj obj = 2; // Protocol buffer objects that were requested -} - -// Workers - -service WorkerService { - rpc ExecuteTask(ExecuteTaskRequest) returns (AckReply); // Scheduler calls a function from the worker - rpc RunFunctionOnWorker(RunFunctionOnWorkerRequest) returns (AckReply); // Runs a function on the worker. - rpc ImportRemoteFunction(ImportRemoteFunctionRequest) returns (AckReply); // Scheduler imports a function into the worker - rpc ImportReusableVariable(ImportReusableVariableRequest) returns (AckReply); // Scheduler imports a reusable variable into the worker - rpc Die(DieRequest) returns (AckReply); // Kills this worker - rpc PrintErrorMessage(PrintErrorMessageRequest) returns (AckReply); // Causes an error message to be printed. -} - -message ExecuteTaskRequest { - Task task = 1; // Contains name of the function to be executed and arguments -} - -message RunFunctionOnWorkerRequest { - Function function = 1; -} - -message ImportRemoteFunctionRequest { - Function function = 1; -} - -message ImportReusableVariableRequest { - ReusableVar reusable_variable = 1; // The reusable variable to export. -} - -message DieRequest { -} - -// This message is used by the worker service to send messages to the worker -// that are processed by the worker's main loop. -message WorkerMessage { - oneof worker_item { - Task task = 1; // A task for the worker to execute. - Function function = 2; // A remote function to import on the worker. - ReusableVar reusable_variable = 3; // A reusable variable to import on the worker. - Function function_to_run = 4; // An arbitrary function to run on the worker. - } -} - -message PrintErrorMessageRequest { - Failure failure = 1; // The failure object. -} diff --git a/protos/types.proto b/protos/types.proto deleted file mode 100644 index ecb16a321..000000000 --- a/protos/types.proto +++ /dev/null @@ -1,118 +0,0 @@ -syntax = "proto3"; - -message Int { - int64 data = 1; -} - -message Long { - int64 data = 1; -} - -message String { - bytes data = 1; -} - -message Unicode { - string data = 1; -} - -message Double { - double data = 1; -} - -// Empty used to represent a None object -message Empty { -} - -message Bool { - bool data = 1; -} - -message ObjID { - uint64 data = 1; -} - -message PyObj { - bytes data = 1; -} - -// Used for shipping remote functions to workers -message Function { - string name = 1; - bytes implementation = 2; -} - -message ReusableVar { - string name = 1; // The name of the reusable variable. - Function initializer = 2; // A serialized version of the function that initializes the reusable variable. - Function reinitializer = 3; // A serialized version of the function that reinitializes the reusable variable. -} - -enum FailedType { - FailedTask = 0; - FailedRemoteFunctionImport = 1; - FailedReusableVariableImport = 2; - FailedReinitializeReusableVariable = 3; - FailedFunctionToRun = 4; -} - -// Used to represent exceptions thrown in Python. This will happen when a task -// fails to execute, a remote function fails to be imported, or a reusable -// variable fails to be imported. -message Failure { - FailedType type = 1; // The type of the failure. - uint64 workerid = 2; // The id of the worker on which the failure occurred. - string worker_address = 3; // The address of the worker on which the failure occurred. This contains the same information as the workerid. - string name = 4; // The name of the failed object. - string error_message = 5; // The error message from the failure. -} - -message ObjstoreData { - uint64 objstoreid = 1; // The ID of the object store. - string address = 2; // The address of the object store. -} - -// Union of possible object types -message Obj { - String string_data = 1; - Unicode unicode_data = 13; - Int int_data = 2; - Long long_data = 12; - Double double_data = 3; - Bool bool_data = 10; - Tuple tuple_data = 7; - List list_data = 4; - Dict dict_data = 8; - Array array_data = 5; - Empty empty_data = 9; - ObjID objectid_data = 11; - PyObj pyobj_data = 6; -} - -message List { - repeated Obj elem = 1; -} - -message Tuple { - repeated Obj elem = 1; -} - -message DictEntry { - Obj key = 1; - Obj value = 2; -} - -message Dict { - repeated DictEntry elem = 1; -} - -message Array { - repeated uint64 shape = 1; - sint64 dtype = 2; - bool is_scalar = 8; - repeated double double_data = 3; - repeated float float_data = 4; - repeated sint64 int_data = 5; - repeated uint64 uint_data = 6; - repeated uint64 objectid_data = 7; -} diff --git a/scripts/gen-protobuf.sh b/scripts/gen-protobuf.sh deleted file mode 100755 index 9a5ceed58..000000000 --- a/scripts/gen-protobuf.sh +++ /dev/null @@ -1,7 +0,0 @@ -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" - -# TODO(mehrdad): How would this look in windows, where does the protoc executable go? -# On Linux, we compile it ourselves, on Windows we might not want to do that (?) -mkdir -p $DIR/../lib/python/ray/internal/ -$DIR/../thirdparty/grpc/bins/opt/protobuf/protoc -I ../protos/ --python_out=$DIR/../lib/python/ray/internal/ ../protos/graph.proto -$DIR/../thirdparty/grpc/bins/opt/protobuf/protoc -I ../protos/ --python_out=$DIR/../lib/python/ray/internal/ ../protos/types.proto