diff --git a/CMakeLists.txt b/CMakeLists.txt index 419ada6f5..fb0e5990c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 2.8) -project(halo) +project(ray) set(THIRDPARTY_DIR "${CMAKE_SOURCE_DIR}/thirdparty") @@ -23,15 +23,15 @@ include_directories("${NUMPY_INCLUDE_DIR}") set(PROTO_PATH "${CMAKE_SOURCE_DIR}/protos") -set(HALO_PROTO "${PROTO_PATH}/halo.proto") +set(RAY_PROTO "${PROTO_PATH}/ray.proto") set(TYPES_PROTO "${PROTO_PATH}/types.proto") set(GENERATED_PROTOBUF_PATH "${CMAKE_BINARY_DIR}/generated") file(MAKE_DIRECTORY ${GENERATED_PROTOBUF_PATH}) -set(HALO_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/halo.pb.cc") -set(HALO_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/halo.pb.h") -set(HALO_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/halo.grpc.pb.cc") -set(HALO_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/halo.grpc.pb.h") +set(RAY_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.cc") +set(RAY_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.h") +set(RAY_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.cc") +set(RAY_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.h") set(TYPES_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.cc") set(TYPES_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.h") @@ -39,19 +39,19 @@ set(TYPES_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.cc") set(TYPES_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.h") add_custom_command( - OUTPUT "${HALO_PB_H_FILE}" - "${HALO_PB_CPP_FILE}" - "${HALO_GRPC_PB_H_FILE}" - "${HALO_GRPC_PB_CPP_FILE}" + OUTPUT "${RAY_PB_H_FILE}" + "${RAY_PB_CPP_FILE}" + "${RAY_GRPC_PB_H_FILE}" + "${RAY_GRPC_PB_CPP_FILE}" COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc ARGS "--proto_path=${PROTO_PATH}" "--cpp_out=${GENERATED_PROTOBUF_PATH}" - "${HALO_PROTO}" + "${RAY_PROTO}" COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc ARGS "--proto_path=${PROTO_PATH}" "--grpc_out=${GENERATED_PROTOBUF_PATH}" "--plugin=protoc-gen-grpc=${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/grpc_cpp_plugin" - "${HALO_PROTO}" + "${RAY_PROTO}" ) add_custom_command( @@ -70,8 +70,8 @@ add_custom_command( "${TYPES_PROTO}" ) -set(GENERATED_PROTOBUF_FILES ${HALO_PB_H_FILE} ${HALO_PB_CPP_FILE} - ${HALO_GRPC_PB_H_FILE} ${HALO_GRPC_PB_CPP_FILE} +set(GENERATED_PROTOBUF_FILES ${RAY_PB_H_FILE} ${RAY_PB_CPP_FILE} + ${RAY_GRPC_PB_H_FILE} ${RAY_GRPC_PB_CPP_FILE} ${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE} ${TYPES_GRPC_PB_H_FILE} ${TYPES_GRPC_PB_CPP_FILE}) @@ -110,7 +110,7 @@ target_link_libraries(pynumbuf ${ARROW_LIB} ${PYTHON_LIBRARIES}) add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) target_link_libraries(objstore ${ARROW_LIB} pynumbuf) add_executable(scheduler src/scheduler.cc src/computation_graph.cc ${GENERATED_PROTOBUF_FILES}) -add_library(halolib SHARED src/halolib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(halolib ${ARROW_LIB} pynumbuf) +add_library(raylib SHARED src/raylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(raylib ${ARROW_LIB} pynumbuf) -install(TARGETS objstore scheduler halolib DESTINATION ${CMAKE_SOURCE_DIR}/lib/python/halo) +install(TARGETS objstore scheduler raylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/python/ray) diff --git a/README.md b/README.md index 13107d5bb..041f03b8a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Halo +# Ray -Halo is a distributed execution framework with a Python-like programming model. +Ray is a distributed execution framework with a Python-like programming model. ## Design Decisions @@ -14,6 +14,6 @@ For a description of our design decisions, see 1. sudo apt-get update 2. sudo apt-get install git -3. git clone https://github.com/amplab/halo.git -4. cd halo +3. git clone https://github.com/amplab/ray.git +4. cd ray 5. bash setup.sh diff --git a/doc/aliasing.md b/doc/aliasing.md index e51fd1b97..e88e928bf 100644 --- a/doc/aliasing.md +++ b/doc/aliasing.md @@ -1,6 +1,6 @@ # Aliasing -An important feature of Halo is that a remote call sent to the scheduler +An important feature of Ray is that a remote call sent to the scheduler immediately returns object references to the outputs of the task, and the actual outputs of the task are only associated with the relevant object references after the task has been executed and the outputs have been computed. This allows @@ -10,15 +10,15 @@ However, to provide a more flexible API, we allow tasks to not only return values, but to also return object references to values. As an examples, consider the following code. ```python -@halo.remote([], [np.ndarray]) +@ray.remote([], [np.ndarray]) def f() return np.zeros(5) -@halo.remote([], [np.ndarray]) +@ray.remote([], [np.ndarray]) def g() return f() -@halo.remote([], [np.ndarray]) +@ray.remote([], [np.ndarray]) def h() return g() ``` diff --git a/doc/reference-counting.md b/doc/reference-counting.md index 7a452297d..dc4cb4b56 100644 --- a/doc/reference-counting.md +++ b/doc/reference-counting.md @@ -1,6 +1,6 @@ # Reference Counting -In Halo, each object is assigned a globally unique object reference by the +In Ray, each object is assigned a globally unique object reference by the scheduler (starting with 0 and incrementing upward). The objects are stored in object stores. In order to avoid running out of memory, the object stores must know when it is ok to deallocate an object. Since a worker on one node may have @@ -11,7 +11,7 @@ information. ## Reference Counting Two approaches to reclaiming memory are garbage collection and reference -counting. We choose to use a reference counting approach in Halo. There are a +counting. We choose to use a reference counting approach in Ray. There are a couple of reasons for this. Reference counting allows us to reclaim memory as early as possible. It also avoids pausing the system for garbage collection. We also note that implementing reference counting at the cluster level plays nicely @@ -77,13 +77,13 @@ because they must be passed into `AliasObjRefs` at some point). The following problem has not yet been resolved. In the following code, the result `x` will be garbage. ```python -x = halo.pull(ra.zeros([10, 10], "float")) +x = ray.pull(ra.zeros([10, 10], "float")) ``` When `ra.zeros` is called, a worker will create an array of zeros and store it in an object store. An object reference to the output is returned. The call -to `halo.pull` will not copy data from the object store process to the worker +to `ray.pull` will not copy data from the object store process to the worker process, but will instead give the worker process a pointer to shared memory. -After the `halo.pull` call completes, the object reference returned by +After the `ray.pull` call completes, the object reference returned by `ra.zeros` will go out of scope, and the object it refers to will be deallocated from the object store. This will cause the memory that `x` points to to be garbage. diff --git a/doc/scheduler.md b/doc/scheduler.md index 17a8ec12a..97187619e 100644 --- a/doc/scheduler.md +++ b/doc/scheduler.md @@ -1,6 +1,6 @@ # Scheduler -The scheduling strategies currently implemented in Halo are fairly basic and +The scheduling strategies currently implemented in Ray are fairly basic and all use a central scheduler. * The naive scheduler assigns tasks to workers just taking into account diff --git a/include/halo/halo.h b/include/ray/ray.h similarity index 77% rename from include/halo/halo.h rename to include/ray/ray.h index 3fbded72d..37de825a6 100644 --- a/include/halo/halo.h +++ b/include/ray/ray.h @@ -1,5 +1,5 @@ -#ifndef HALO_INCLUDE_HALO_H -#define HALO_INCLUDE_HALO_H +#ifndef RAY_INCLUDE_RAY_H +#define RAY_INCLUDE_RAY_H #include #include @@ -34,20 +34,20 @@ public: typedef std::vector > ObjTable; typedef std::unordered_map FnTable; -#define HALO_VERBOSE -1 -#define HALO_INFO 0 -#define HALO_DEBUG 1 -#define HALO_FATAL 2 -#define HALO_REFCOUNT HALO_VERBOSE -#define HALO_ALIAS HALO_VERBOSE +#define RAY_VERBOSE -1 +#define RAY_INFO 0 +#define RAY_DEBUG 1 +#define RAY_FATAL 2 +#define RAY_REFCOUNT RAY_VERBOSE +#define RAY_ALIAS RAY_VERBOSE -#define HALO_LOG(LEVEL, MESSAGE) \ - if (LEVEL == HALO_VERBOSE) { \ +#define RAY_LOG(LEVEL, MESSAGE) \ + if (LEVEL == RAY_VERBOSE) { \ \ - } else if (LEVEL == HALO_FATAL) { \ + } else if (LEVEL == RAY_FATAL) { \ std::cerr << "fatal error occured: " << MESSAGE << std::endl; \ std::exit(1); \ - } else if (LEVEL == HALO_DEBUG) { \ + } else if (LEVEL == RAY_DEBUG) { \ \ } else { \ std::cout << MESSAGE << std::endl; \ diff --git a/lib/python/halo/__init__.py b/lib/python/ray/__init__.py similarity index 81% rename from lib/python/halo/__init__.py rename to lib/python/ray/__init__.py index 0e727db61..9f8f3132d 100644 --- a/lib/python/halo/__init__.py +++ b/lib/python/ray/__init__.py @@ -1,3 +1,3 @@ -import libhalolib as lib +import libraylib as lib import serialization from worker import scheduler_info, register_module, connect, disconnect, pull, push, remote diff --git a/lib/python/halo/arrays/__init__.py b/lib/python/ray/arrays/__init__.py similarity index 100% rename from lib/python/halo/arrays/__init__.py rename to lib/python/ray/arrays/__init__.py diff --git a/lib/python/halo/arrays/distributed/__init__.py b/lib/python/ray/arrays/distributed/__init__.py similarity index 100% rename from lib/python/halo/arrays/distributed/__init__.py rename to lib/python/ray/arrays/distributed/__init__.py diff --git a/lib/python/halo/arrays/distributed/core.py b/lib/python/ray/arrays/distributed/core.py similarity index 90% rename from lib/python/halo/arrays/distributed/core.py rename to lib/python/ray/arrays/distributed/core.py index 8a0a3d7f9..be57ba235 100644 --- a/lib/python/halo/arrays/distributed/core.py +++ b/lib/python/ray/arrays/distributed/core.py @@ -1,7 +1,7 @@ from typing import List import numpy as np -import halo.arrays.remote as ra -import halo +import ray.arrays.remote as ra +import ray __all__ = ["BLOCK_SIZE", "DistArray", "assemble", "zeros", "ones", "copy", "eye", "triu", "tril", "blockwise_dot", "dot", "transpose", "add", "subtract", "numpy_to_dist", "subblocks"] @@ -55,13 +55,13 @@ class DistArray(object): def assemble(self): """Assemble an array on this node from a distributed array object reference.""" - first_block = halo.pull(self.objrefs[(0,) * self.ndim]) + first_block = ray.pull(self.objrefs[(0,) * self.ndim]) dtype = first_block.dtype result = np.zeros(self.shape, dtype=dtype) for index in np.ndindex(*self.num_blocks): lower = DistArray.compute_block_lower(index, self.shape) upper = DistArray.compute_block_upper(index, self.shape) - result[[slice(l, u) for (l, u) in zip(lower, upper)]] = halo.pull(self.objrefs[index]) + result[[slice(l, u) for (l, u) in zip(lower, upper)]] = ray.pull(self.objrefs[index]) return result def __getitem__(self, sliced): @@ -69,42 +69,42 @@ class DistArray(object): a = self.assemble() return a[sliced] -@halo.remote([DistArray], [np.ndarray]) +@ray.remote([DistArray], [np.ndarray]) def assemble(a): return a.assemble() # TODO(rkn): what should we call this method -@halo.remote([np.ndarray], [DistArray]) +@ray.remote([np.ndarray], [DistArray]) def numpy_to_dist(a): result = DistArray(a.shape) for index in np.ndindex(*result.num_blocks): lower = DistArray.compute_block_lower(index, a.shape) upper = DistArray.compute_block_upper(index, a.shape) - result.objrefs[index] = halo.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) + result.objrefs[index] = ray.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) return result -@halo.remote([List[int], str], [DistArray]) +@ray.remote([List[int], str], [DistArray]) def zeros(shape, dtype_name="float"): result = DistArray(shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = ra.zeros(DistArray.compute_block_shape(index, shape), dtype_name=dtype_name) return result -@halo.remote([List[int], str], [DistArray]) +@ray.remote([List[int], str], [DistArray]) def ones(shape, dtype_name="float"): result = DistArray(shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = ra.ones(DistArray.compute_block_shape(index, shape), dtype_name=dtype_name) return result -@halo.remote([DistArray], [DistArray]) +@ray.remote([DistArray], [DistArray]) def copy(a): result = DistArray(a.shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = a.objrefs[index] # We don't need to actually copy the objects because cluster-level objects are assumed to be immutable. return result -@halo.remote([int, int, str], [DistArray]) +@ray.remote([int, int, str], [DistArray]) def eye(dim1, dim2=-1, dtype_name="float"): dim2 = dim1 if dim2 == -1 else dim2 shape = [dim1, dim2] @@ -117,7 +117,7 @@ def eye(dim1, dim2=-1, dtype_name="float"): result.objrefs[i, j] = ra.zeros(block_shape, dtype_name=dtype_name) return result -@halo.remote([DistArray], [DistArray]) +@ray.remote([DistArray], [DistArray]) def triu(a): if a.ndim != 2: raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim)) @@ -131,7 +131,7 @@ def triu(a): result.objrefs[i, j] = ra.zeros_like(a.objrefs[i, j]) return result -@halo.remote([DistArray], [DistArray]) +@ray.remote([DistArray], [DistArray]) def tril(a): if a.ndim != 2: raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim)) @@ -145,7 +145,7 @@ def tril(a): result.objrefs[i, j] = ra.zeros_like(a.objrefs[i, j]) return result -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def blockwise_dot(*matrices): n = len(matrices) if n % 2 != 0: @@ -156,7 +156,7 @@ def blockwise_dot(*matrices): result += np.dot(matrices[i], matrices[n / 2 + i]) return result -@halo.remote([DistArray, DistArray], [DistArray]) +@ray.remote([DistArray, DistArray], [DistArray]) def dot(a, b): if a.ndim != 2: raise Exception("dot expects its arguments to be 2-dimensional, but a.ndim = {}.".format(a.ndim)) @@ -171,7 +171,7 @@ def dot(a, b): result.objrefs[i, j] = blockwise_dot(*args) return result -@halo.remote([DistArray, List[int]], [DistArray]) +@ray.remote([DistArray, List[int]], [DistArray]) def subblocks(a, *ranges): """ This function produces a distributed array from a subset of the blocks in the `a`. The result and `a` will have the same number of dimensions.For example, @@ -202,7 +202,7 @@ def subblocks(a, *ranges): result.objrefs[index] = a.objrefs[tuple([ranges[i][index[i]] for i in range(a.ndim)])] return result -@halo.remote([DistArray], [DistArray]) +@ray.remote([DistArray], [DistArray]) def transpose(a): if a.ndim != 2: raise Exception("transpose expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape)) @@ -213,7 +213,7 @@ def transpose(a): return result # TODO(rkn): support broadcasting? -@halo.remote([DistArray, DistArray], [DistArray]) +@ray.remote([DistArray, DistArray], [DistArray]) def add(x1, x2): if x1.shape != x2.shape: raise Exception("add expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape)) @@ -223,7 +223,7 @@ def add(x1, x2): return result # TODO(rkn): support broadcasting? -@halo.remote([DistArray, DistArray], [DistArray]) +@ray.remote([DistArray, DistArray], [DistArray]) def subtract(x1, x2): if x1.shape != x2.shape: raise Exception("subtract expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape)) diff --git a/lib/python/halo/arrays/distributed/linalg.py b/lib/python/ray/arrays/distributed/linalg.py similarity index 82% rename from lib/python/halo/arrays/distributed/linalg.py rename to lib/python/ray/arrays/distributed/linalg.py index a04fb92f1..909a1b4d6 100644 --- a/lib/python/halo/arrays/distributed/linalg.py +++ b/lib/python/ray/arrays/distributed/linalg.py @@ -1,14 +1,14 @@ from typing import List import numpy as np -import halo.arrays.remote as ra -import halo +import ray.arrays.remote as ra +import ray from core import * __all__ = ["tsqr", "modified_lu", "tsqr_hr", "qr"] -@halo.remote([DistArray], [DistArray, np.ndarray]) +@ray.remote([DistArray], [DistArray, np.ndarray]) def tsqr(a): """ arguments: @@ -17,10 +17,10 @@ def tsqr(a): a.shape == (M, N) K == min(M, N) return values: - q: DistArray, if q_full = halo.context.pull(DistArray, q).assemble(), then + q: DistArray, if q_full = ray.context.pull(DistArray, q).assemble(), then q_full.shape == (M, K) np.allclose(np.dot(q_full.T, q_full), np.eye(K)) == True - r: np.ndarray, if r_val = halo.context.pull(np.ndarray, r), then + r: np.ndarray, if r_val = ray.context.pull(np.ndarray, r), then r_val.shape == (K, N) np.allclose(r, np.triu(r)) == True """ @@ -80,7 +80,7 @@ def tsqr(a): return q_result, r # TODO(rkn): This is unoptimized, we really want a block version of this. -@halo.remote([DistArray], [DistArray, np.ndarray, np.ndarray]) +@ray.remote([DistArray], [DistArray, np.ndarray, np.ndarray]) def modified_lu(q): """ Algorithm 5 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf @@ -108,39 +108,39 @@ def modified_lu(q): for i in range(b): L[i, i] = 1 U = np.triu(q_work)[:b, :] - return numpy_to_dist(halo.push(L)), U, S # TODO(rkn): get rid of push and pull + return numpy_to_dist(ray.push(L)), U, S # TODO(rkn): get rid of push and pull -@halo.remote([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray]) +@ray.remote([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray]) def tsqr_hr_helper1(u, s, y_top_block, b): y_top = y_top_block[:b, :b] s_full = np.diag(s) t = -1 * np.dot(u, np.dot(s_full, np.linalg.inv(y_top).T)) return t, y_top -@halo.remote([np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray], [np.ndarray]) def tsqr_hr_helper2(s, r_temp): s_full = np.diag(s) return np.dot(s_full, r_temp) -@halo.remote([DistArray], [DistArray, np.ndarray, np.ndarray, np.ndarray]) +@ray.remote([DistArray], [DistArray, np.ndarray, np.ndarray, np.ndarray]) def tsqr_hr(a): """Algorithm 6 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" q, r_temp = tsqr(a) y, u, s = modified_lu(q) - y_blocked = halo.pull(y) + y_blocked = ray.pull(y) t, y_top = tsqr_hr_helper1(u, s, y_blocked.objrefs[0, 0], a.shape[1]) r = tsqr_hr_helper2(s, r_temp) return y, t, y_top, r -@halo.remote([np.ndarray, np.ndarray, np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray, np.ndarray, np.ndarray], [np.ndarray]) def qr_helper1(a_rc, y_ri, t, W_c): return a_rc - np.dot(y_ri, np.dot(t.T, W_c)) -@halo.remote([np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray], [np.ndarray]) def qr_helper2(y_ri, a_rc): return np.dot(y_ri.T, a_rc) -@halo.remote([DistArray], [DistArray, DistArray]) +@ray.remote([DistArray], [DistArray, DistArray]) def qr(a): """Algorithm 7 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" m, n = a.shape[0], a.shape[1] @@ -150,21 +150,21 @@ def qr(a): a_work = DistArray() a_work.construct(a.shape, np.copy(a.objrefs)) - result_dtype = np.linalg.qr(halo.pull(a.objrefs[0, 0]))[0].dtype.name - r_res = halo.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. - y_res = halo.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + result_dtype = np.linalg.qr(ray.pull(a.objrefs[0, 0]))[0].dtype.name + r_res = ray.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + y_res = ray.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. Ts = [] for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0] sub_dist_array = subblocks(a_work, range(i, a_work.num_blocks[0]), [i]) y, t, _, R = tsqr_hr(sub_dist_array) - y_val = halo.pull(y) + y_val = ray.pull(y) for j in range(i, a.num_blocks[0]): y_res.objrefs[j, i] = y_val.objrefs[j - i, 0] if a.shape[0] > a.shape[1]: # in this case, R needs to be square - R_shape = halo.pull(ra.shape(R)) + R_shape = ray.pull(ra.shape(R)) eye_temp = ra.eye(R_shape[1], R_shape[0], dtype_name=result_dtype) r_res.objrefs[i, i] = ra.dot(eye_temp, R) else: diff --git a/lib/python/halo/arrays/distributed/random.py b/lib/python/ray/arrays/distributed/random.py similarity index 82% rename from lib/python/halo/arrays/distributed/random.py rename to lib/python/ray/arrays/distributed/random.py index 5ca9b38d9..e10579eda 100644 --- a/lib/python/halo/arrays/distributed/random.py +++ b/lib/python/ray/arrays/distributed/random.py @@ -1,12 +1,12 @@ from typing import List import numpy as np -import halo.arrays.remote as ra -import halo +import ray.arrays.remote as ra +import ray from core import * -@halo.remote([List[int]], [DistArray]) +@ray.remote([List[int]], [DistArray]) def normal(shape): num_blocks = DistArray.compute_num_blocks(shape) objrefs = np.empty(num_blocks, dtype=object) diff --git a/lib/python/halo/arrays/remote/__init__.py b/lib/python/ray/arrays/remote/__init__.py similarity index 100% rename from lib/python/halo/arrays/remote/__init__.py rename to lib/python/ray/arrays/remote/__init__.py diff --git a/lib/python/halo/arrays/remote/core.py b/lib/python/ray/arrays/remote/core.py similarity index 65% rename from lib/python/halo/arrays/remote/core.py rename to lib/python/ray/arrays/remote/core.py index 4487f72c9..c169e7431 100644 --- a/lib/python/halo/arrays/remote/core.py +++ b/lib/python/ray/arrays/remote/core.py @@ -1,77 +1,77 @@ from typing import List import numpy as np -import halo +import ray __all__ = ["zeros", "zeros_like", "ones", "eye", "dot", "vstack", "hstack", "subarray", "copy", "tril", "triu", "diag", "transpose", "add", "subtract", "sum", "shape"] -@halo.remote([List[int], str, str], [np.ndarray]) +@ray.remote([List[int], str, str], [np.ndarray]) def zeros(shape, dtype_name="float", order="C"): return np.zeros(shape, dtype=np.dtype(dtype_name), order=order) -@halo.remote([np.ndarray, str, str, bool], [np.ndarray]) +@ray.remote([np.ndarray, str, str, bool], [np.ndarray]) def zeros_like(a, dtype_name="None", order="K", subok=True): dtype_val = None if dtype_name == "None" else np.dtype(dtype_name) return np.zeros_like(a, dtype=dtype_val, order=order, subok=subok) -@halo.remote([List[int], str, str], [np.ndarray]) +@ray.remote([List[int], str, str], [np.ndarray]) def ones(shape, dtype_name="float", order="C"): return np.ones(shape, dtype=np.dtype(dtype_name), order=order) -@halo.remote([int, int, int, str], [np.ndarray]) +@ray.remote([int, int, int, str], [np.ndarray]) def eye(N, M=-1, k=0, dtype_name="float"): M = N if M == -1 else M return np.eye(N, M=M, k=k, dtype=np.dtype(dtype_name)) -@halo.remote([np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray], [np.ndarray]) def dot(a, b): return np.dot(a, b) -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def vstack(*xs): return np.vstack(xs) -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def hstack(*xs): return np.hstack(xs) # TODO(rkn): instead of this, consider implementing slicing -@halo.remote([np.ndarray, List[int], List[int]], [np.ndarray]) +@ray.remote([np.ndarray, List[int], List[int]], [np.ndarray]) def subarray(a, lower_indices, upper_indices): # TODO(rkn): be consistent about using "index" versus "indices" return a[[slice(l, u) for (l, u) in zip(lower_indices, upper_indices)]] -@halo.remote([np.ndarray, str], [np.ndarray]) +@ray.remote([np.ndarray, str], [np.ndarray]) def copy(a, order="K"): return np.copy(a, order=order) -@halo.remote([np.ndarray, int], [np.ndarray]) +@ray.remote([np.ndarray, int], [np.ndarray]) def tril(m, k=0): return np.tril(m, k=k) -@halo.remote([np.ndarray, int], [np.ndarray]) +@ray.remote([np.ndarray, int], [np.ndarray]) def triu(m, k=0): return np.triu(m, k=k) -@halo.remote([np.ndarray, int], [np.ndarray]) +@ray.remote([np.ndarray, int], [np.ndarray]) def diag(v, k=0): return np.diag(v, k=k) -@halo.remote([np.ndarray, List[int]], [np.ndarray]) +@ray.remote([np.ndarray, List[int]], [np.ndarray]) def transpose(a, axes=[]): axes = None if axes == [] else axes return np.transpose(a, axes=axes) -@halo.remote([np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray], [np.ndarray]) def add(x1, x2): return np.add(x1, x2) -@halo.remote([np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray], [np.ndarray]) def subtract(x1, x2): return np.subtract(x1, x2) -@halo.remote([int, np.ndarray], [np.ndarray]) +@ray.remote([int, np.ndarray], [np.ndarray]) def sum(axis, *xs): return np.sum(xs, axis=axis) -@halo.remote([np.ndarray], [tuple]) +@ray.remote([np.ndarray], [tuple]) def shape(a): return np.shape(a) diff --git a/lib/python/halo/arrays/remote/linalg.py b/lib/python/ray/arrays/remote/linalg.py similarity index 56% rename from lib/python/halo/arrays/remote/linalg.py rename to lib/python/ray/arrays/remote/linalg.py index f7d7fa6b2..f79e94684 100644 --- a/lib/python/halo/arrays/remote/linalg.py +++ b/lib/python/ray/arrays/remote/linalg.py @@ -1,88 +1,88 @@ from typing import List import numpy as np -import halo +import ray __all__ = ["matrix_power", "solve", "tensorsolve", "tensorinv", "inv", "cholesky", "eigvals", "eigvalsh", "pinv", "slogdet", "det", "svd", "eig", "eigh", "lstsq", "norm", "qr", "cond", "matrix_rank", "LinAlgError", "multi_dot"] -@halo.remote([np.ndarray, int], [np.ndarray]) +@ray.remote([np.ndarray, int], [np.ndarray]) def matrix_power(M, n): return np.linalg.matrix_power(M, n) -@halo.remote([np.ndarray, np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray, np.ndarray], [np.ndarray]) def solve(a, b): return np.linalg.solve(a, b) -@halo.remote([np.ndarray], [np.ndarray, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray]) def tensorsolve(a): raise NotImplementedError -@halo.remote([np.ndarray], [np.ndarray, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray]) def tensorinv(a): raise NotImplementedError -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def inv(a): return np.linalg.inv(a) -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def cholesky(a): return np.linalg.cholesky(a) -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def eigvals(a): return np.linalg.eigvals(a) -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def eigvalsh(a): raise NotImplementedError -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def pinv(a): return np.linalg.pinv(a) -@halo.remote([np.ndarray], [int]) +@ray.remote([np.ndarray], [int]) def slogdet(a): raise NotImplementedError -@halo.remote([np.ndarray], [float]) +@ray.remote([np.ndarray], [float]) def det(a): return np.linalg.det(a) -@halo.remote([np.ndarray], [np.ndarray, np.ndarray, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray, np.ndarray]) def svd(a): return np.linalg.svd(a) -@halo.remote([np.ndarray], [np.ndarray, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray]) def eig(a): return np.linalg.eig(a) -@halo.remote([np.ndarray], [np.ndarray, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray]) def eigh(a): return np.linalg.eigh(a) -@halo.remote([np.ndarray], [np.ndarray, np.ndarray, int, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray, int, np.ndarray]) def lstsq(a, b): return np.linalg.lstsq(a) -@halo.remote([np.ndarray], [float]) +@ray.remote([np.ndarray], [float]) def norm(x): return np.linalg.norm(x) -@halo.remote([np.ndarray], [np.ndarray, np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray, np.ndarray]) def qr(a): return np.linalg.qr(a) -@halo.remote([np.ndarray], [float]) +@ray.remote([np.ndarray], [float]) def cond(x): return np.linalg.cond(x) -@halo.remote([np.ndarray], [int]) +@ray.remote([np.ndarray], [int]) def matrix_rank(M): return np.linalg.matrix_rank(M) -@halo.remote([np.ndarray], [np.ndarray]) +@ray.remote([np.ndarray], [np.ndarray]) def multi_dot(*a): raise NotImplementedError diff --git a/lib/python/halo/arrays/remote/random.py b/lib/python/ray/arrays/remote/random.py similarity index 66% rename from lib/python/halo/arrays/remote/random.py rename to lib/python/ray/arrays/remote/random.py index ed435a046..7c2a6419c 100644 --- a/lib/python/halo/arrays/remote/random.py +++ b/lib/python/ray/arrays/remote/random.py @@ -1,7 +1,7 @@ from typing import List import numpy as np -import halo +import ray -@halo.remote([List[int]], [np.ndarray]) +@ray.remote([List[int]], [np.ndarray]) def normal(shape): return np.random.normal(size=shape) diff --git a/lib/python/halo/datasets/__init__.py b/lib/python/ray/datasets/__init__.py similarity index 100% rename from lib/python/halo/datasets/__init__.py rename to lib/python/ray/datasets/__init__.py diff --git a/lib/python/halo/serialization.py b/lib/python/ray/serialization.py similarity index 60% rename from lib/python/halo/serialization.py rename to lib/python/ray/serialization.py index 815b8ca7a..f15a0f9d9 100644 --- a/lib/python/halo/serialization.py +++ b/lib/python/ray/serialization.py @@ -1,6 +1,6 @@ import importlib -import halo +import ray def to_primitive(obj): if hasattr(obj, "serialize"): @@ -22,18 +22,18 @@ def from_primitive(primitive_obj): def serialize(worker_capsule, obj): primitive_obj = to_primitive(obj) - obj_capsule, contained_objrefs = halo.lib.serialize_object(worker_capsule, primitive_obj) # contained_objrefs is a list of the objrefs contained in obj + obj_capsule, contained_objrefs = ray.lib.serialize_object(worker_capsule, primitive_obj) # contained_objrefs is a list of the objrefs contained in obj return obj_capsule, contained_objrefs def deserialize(worker_capsule, capsule): - primitive_obj = halo.lib.deserialize_object(worker_capsule, capsule) + primitive_obj = ray.lib.deserialize_object(worker_capsule, capsule) return from_primitive(primitive_obj) def serialize_task(worker_capsule, func_name, args): - primitive_args = [(arg if isinstance(arg, halo.lib.ObjRef) else to_primitive(arg)) for arg in args] - return halo.lib.serialize_task(worker_capsule, func_name, primitive_args) + primitive_args = [(arg if isinstance(arg, ray.lib.ObjRef) else to_primitive(arg)) for arg in args] + return ray.lib.serialize_task(worker_capsule, func_name, primitive_args) def deserialize_task(worker_capsule, task): - func_name, primitive_args, return_objrefs = halo.lib.deserialize_task(worker_capsule, task) - args = [(arg if isinstance(arg, halo.lib.ObjRef) else from_primitive(arg)) for arg in primitive_args] + func_name, primitive_args, return_objrefs = ray.lib.deserialize_task(worker_capsule, task) + args = [(arg if isinstance(arg, ray.lib.ObjRef) else from_primitive(arg)) for arg in primitive_args] return func_name, args, return_objrefs diff --git a/lib/python/halo/services.py b/lib/python/ray/services.py similarity index 92% rename from lib/python/halo/services.py rename to lib/python/ray/services.py index 523225924..d7cc98b72 100644 --- a/lib/python/halo/services.py +++ b/lib/python/ray/services.py @@ -3,8 +3,8 @@ import os import atexit import time -import halo -import halo.worker as worker +import ray +import ray.worker as worker _services_path = os.path.dirname(os.path.abspath(__file__)) @@ -58,9 +58,9 @@ def cleanup(): global drivers for driver in drivers: - halo.disconnect(driver) + ray.disconnect(driver) if len(drivers) == 0: - halo.disconnect() + ray.disconnect() drivers = [] # atexit.register(cleanup) @@ -97,7 +97,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None for _ in range(num_workers): start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.3) - halo.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.5) def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None): @@ -124,11 +124,11 @@ def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_ driver_workers = [] for i in range(num_objstores): driver_worker = worker.Worker() - halo.connect(scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()), driver_worker) + ray.connect(scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()), driver_worker) driver_workers.append(driver_worker) drivers.append(driver_worker) time.sleep(0.5) return driver_workers else: - halo.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port())) + ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port())) time.sleep(0.5) diff --git a/lib/python/halo/worker.py b/lib/python/ray/worker.py similarity index 89% rename from lib/python/halo/worker.py rename to lib/python/ray/worker.py index 334d99bbe..3ff0e56f9 100644 --- a/lib/python/halo/worker.py +++ b/lib/python/ray/worker.py @@ -4,7 +4,7 @@ import funcsigs import numpy as np import pynumbuf -import halo +import ray import serialization class Worker(object): @@ -17,10 +17,10 @@ class Worker(object): def put_object(self, objref, value): """Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store.""" if pynumbuf.serializable(value): - halo.lib.put_arrow(self.handle, objref, value) + ray.lib.put_arrow(self.handle, objref, value) else: object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule - halo.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) + ray.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) def get_object(self, objref): """ @@ -29,32 +29,32 @@ class Worker(object): WARNING: get_object can only be called on a canonical objref. """ - if halo.lib.is_arrow(self.handle, objref): - return halo.lib.get_arrow(self.handle, objref) + if ray.lib.is_arrow(self.handle, objref): + return ray.lib.get_arrow(self.handle, objref) else: - object_capsule = halo.lib.get_object(self.handle, objref) + object_capsule = ray.lib.get_object(self.handle, objref) return serialization.deserialize(self.handle, object_capsule) def alias_objrefs(self, alias_objref, target_objref): """Make `alias_objref` refer to the same object that `target_objref` refers to.""" - halo.lib.alias_objrefs(self.handle, alias_objref, target_objref) + ray.lib.alias_objrefs(self.handle, alias_objref, target_objref) def register_function(self, function): """Notify the scheduler that this worker can execute the function with name `func_name`. Store the function `function` locally.""" - halo.lib.register_function(self.handle, function.func_name, len(function.return_types)) + ray.lib.register_function(self.handle, function.func_name, len(function.return_types)) self.functions[function.func_name] = function def submit_task(self, func_name, args): """Tell the scheduler to schedule the execution of the function with name `func_name` with arguments `args`. Retrieve object references for the outputs of the function from the scheduler and immediately return them.""" task_capsule = serialization.serialize_task(self.handle, func_name, args) - objrefs = halo.lib.submit_task(self.handle, task_capsule) + objrefs = ray.lib.submit_task(self.handle, task_capsule) return objrefs # We make `global_worker` a global variable so that there is one worker per worker process. global_worker = Worker() def scheduler_info(worker=global_worker): - return halo.lib.scheduler_info(worker.handle); + return ray.lib.scheduler_info(worker.handle); def register_module(module, recursive=False, worker=global_worker): print "registering functions in module {}.".format(module.__name__) @@ -69,32 +69,32 @@ def register_module(module, recursive=False, worker=global_worker): def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker): if hasattr(worker, "handle"): del worker.handle - worker.handle = halo.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) + worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) def disconnect(worker=global_worker): - halo.lib.disconnect(worker.handle) + ray.lib.disconnect(worker.handle) def pull(objref, worker=global_worker): - halo.lib.request_object(worker.handle, objref) + ray.lib.request_object(worker.handle, objref) return worker.get_object(objref) def push(value, worker=global_worker): - objref = halo.lib.get_objref(worker.handle) + objref = ray.lib.get_objref(worker.handle) worker.put_object(objref, value) return objref def main_loop(worker=global_worker): - if not halo.lib.connected(worker.handle): + if not ray.lib.connected(worker.handle): raise Exception("Worker is attempting to enter main_loop but has not been connected yet.") - halo.lib.start_worker_service(worker.handle) + ray.lib.start_worker_service(worker.handle) def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts func_name, args, return_objrefs = serialization.deserialize_task(worker.handle, task) arguments = get_arguments_for_execution(worker.functions[func_name], args, worker) # get args from objstore outputs = worker.functions[func_name].executor(arguments) # execute the function store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store - halo.lib.notify_task_completed(worker.handle) # notify the scheduler that the task has completed + ray.lib.notify_task_completed(worker.handle) # notify the scheduler that the task has completed while True: - task = halo.lib.wait_for_next_task(worker.handle) + task = ray.lib.wait_for_next_task(worker.handle) process_task(task) def remote(arg_types, return_types, worker=global_worker): @@ -148,7 +148,7 @@ def check_return_values(function, result): if len(result) != len(function.return_types): raise Exception("The @remote decorator for function {} has {} return values with types {}, but {} returned {} values.".format(function.__name__, len(function.return_types), function.return_types, function.__name__, len(result))) for i in range(len(result)): - if (not isinstance(result[i], function.return_types[i])) and (not isinstance(result[i], halo.lib.ObjRef)): + if (not isinstance(result[i], function.return_types[i])) and (not isinstance(result[i], ray.lib.ObjRef)): raise Exception("The {}th return value for function {} has type {}, but the @remote decorator expected a return value of type {} or an ObjRef.".format(i, function.__name__, type(result[i]), function.return_types[i])) # helper method, this should not be called by the user @@ -167,7 +167,7 @@ def check_arguments(function, args): else: assert False, "This code should be unreachable." - if isinstance(arg, halo.lib.ObjRef): + if isinstance(arg, ray.lib.ObjRef): # TODO(rkn): When we have type information in the ObjRef, do type checking here. pass else: @@ -194,7 +194,7 @@ def get_arguments_for_execution(function, args, worker=global_worker): else: assert False, "This code should be unreachable." - if isinstance(arg, halo.lib.ObjRef): + if isinstance(arg, ray.lib.ObjRef): # get the object from the local object store print "Getting argument {} for function {}.".format(i, function.__name__) argument = worker.get_object(arg) @@ -214,7 +214,7 @@ def store_outputs_in_objstore(objrefs, outputs, worker=global_worker): outputs = (outputs,) for i in range(len(objrefs)): - if isinstance(outputs[i], halo.lib.ObjRef): + if isinstance(outputs[i], ray.lib.ObjRef): # An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to print "Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val) worker.alias_objrefs(objrefs[i], outputs[i]) diff --git a/lib/python/setup.py b/lib/python/setup.py index e5ba911a1..77a4238df 100644 --- a/lib/python/setup.py +++ b/lib/python/setup.py @@ -3,15 +3,15 @@ import sys from setuptools import setup, Extension, find_packages import setuptools -# because of relative paths, this must be run from inside halo/lib/python/ +# because of relative paths, this must be run from inside ray/lib/python/ setup( - name = "halo", + name = "ray", version = "0.1.dev0", use_2to3=True, packages=find_packages(), package_data = { - "halo": ["libhalolib.so", "scheduler", "objstore"] + "ray": ["libraylib.so", "scheduler", "objstore"] }, zip_safe=False ) diff --git a/protos/halo.proto b/protos/ray.proto similarity index 100% rename from protos/halo.proto rename to protos/ray.proto diff --git a/src/computation_graph.cc b/src/computation_graph.cc index cbca5918c..c554b5e84 100644 --- a/src/computation_graph.cc +++ b/src/computation_graph.cc @@ -4,7 +4,7 @@ OperationId ComputationGraph::add_operation(std::unique_ptr operation OperationId operationid = operations_.size(); OperationId creator_operationid = operation->creator_operationid(); if (spawned_operations_.size() != operationid) { - HALO_LOG(HALO_FATAL, "ComputationGraph is attempting to call add_operation, but spawned_operations_.size() != operationid."); + RAY_LOG(RAY_FATAL, "ComputationGraph is attempting to call add_operation, but spawned_operations_.size() != operationid."); } operations_.emplace_back(std::move(operation)); if (creator_operationid != NO_OPERATION && creator_operationid != ROOT_OPERATION) { @@ -16,10 +16,10 @@ OperationId ComputationGraph::add_operation(std::unique_ptr operation const Task& ComputationGraph::get_task(OperationId operationid) { if (operationid >= operations_.size()) { - HALO_LOG(HALO_FATAL, "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); + RAY_LOG(RAY_FATAL, "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); } if (!operations_[operationid]->has_task()) { - HALO_LOG(HALO_FATAL, "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); + RAY_LOG(RAY_FATAL, "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); } return operations_[operationid]->task(); } diff --git a/src/computation_graph.h b/src/computation_graph.h index 494c94f2c..25c83534e 100644 --- a/src/computation_graph.h +++ b/src/computation_graph.h @@ -1,11 +1,11 @@ -#ifndef HALO_COMPUTATIONGRAPH_H -#define HALO_COMPUTATIONGRAPH_H +#ifndef RAY_COMPUTATIONGRAPH_H +#define RAY_COMPUTATIONGRAPH_H #include #include -#include "halo/halo.h" -#include "halo.grpc.pb.h" +#include "ray/ray.h" +#include "ray.grpc.pb.h" #include "types.pb.h" // used to represent the root operation (that is, the driver code) diff --git a/src/ipc.cc b/src/ipc.cc index 5ff3de2e4..4d39a151c 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -31,9 +31,9 @@ MemorySegmentPool::MemorySegmentPool(ObjStoreId objstoreid, bool create) : objst // creates a memory segment if it is not already there; if the pool is in create mode, // space is allocated, if it is in open mode, the shared memory is mapped into the process void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { - HALO_LOG(HALO_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_); + RAY_LOG(RAY_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_); if (segmentid != segments_.size() && create_mode_) { - HALO_LOG(HALO_FATAL, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); + RAY_LOG(RAY_FATAL, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); } if (segmentid >= segments_.size()) { // resize and initialize segments_ int current_size = segments_.size(); @@ -47,7 +47,7 @@ void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { return; } if (segments_[segmentid].second == SegmentStatusType::CLOSED) { - HALO_LOG(HALO_FATAL, "Attempting to open segmentid " << segmentid << ", but segments_[segmentid].second == SegmentStatusType::CLOSED."); + RAY_LOG(RAY_FATAL, "Attempting to open segmentid " << segmentid << ", but segments_[segmentid].second == SegmentStatusType::CLOSED."); } std::string segment_name = get_segment_name(segmentid); if (create_mode_) { @@ -61,7 +61,7 @@ void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { } void MemorySegmentPool::close_segment(SegmentId segmentid) { - HALO_LOG(HALO_DEBUG, "closing segmentid " << segmentid); + RAY_LOG(RAY_DEBUG, "closing segmentid " << segmentid); std::string segment_name = get_segment_name(segmentid); shared_memory_object::remove(segment_name.c_str()); segments_[segmentid].first.reset(); @@ -70,7 +70,7 @@ void MemorySegmentPool::close_segment(SegmentId segmentid) { ObjHandle MemorySegmentPool::allocate(size_t size) { if (!create_mode_) { // allocate is called only by the object store - HALO_LOG(HALO_FATAL, "Attempting to call allocate, but create_mode_ is false"); + RAY_LOG(RAY_FATAL, "Attempting to call allocate, but create_mode_ is false"); } // TODO(pcm): at the moment, this always creates a new segment, this will be changed SegmentId segmentid = segments_.size(); @@ -91,7 +91,7 @@ void MemorySegmentPool::deallocate(ObjHandle pointer) { // the process that will use the address uint8_t* MemorySegmentPool::get_address(ObjHandle pointer) { if (create_mode_ && segments_[pointer.segmentid()].second != SegmentStatusType::OPENED) { - HALO_LOG(HALO_FATAL, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet."); + RAY_LOG(RAY_FATAL, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet."); } if (!create_mode_) { open_segment(pointer.segmentid()); diff --git a/src/ipc.h b/src/ipc.h index 4ce4400e1..e05572ec6 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -1,5 +1,5 @@ -#ifndef HALO_IPC_H -#define HALO_IPC_H +#ifndef RAY_IPC_H +#define RAY_IPC_H #include #include @@ -10,7 +10,7 @@ #include #include -#include "halo/halo.h" +#include "ray/ray.h" using namespace boost::interprocess; @@ -42,7 +42,7 @@ public: queue_ = std::unique_ptr(new message_queue(open_only, name.c_str())); } } catch(interprocess_exception &ex) { - HALO_LOG(HALO_FATAL, "boost::interprocess exception: " << ex.what()); + RAY_LOG(RAY_FATAL, "boost::interprocess exception: " << ex.what()); } return true; }; @@ -55,7 +55,7 @@ public: try { queue_->send(object, sizeof(T), 0); } catch(interprocess_exception &ex) { - HALO_LOG(HALO_FATAL, "boost::interprocess exception: " << ex.what()); + RAY_LOG(RAY_FATAL, "boost::interprocess exception: " << ex.what()); } return true; }; @@ -66,7 +66,7 @@ public: try { queue_->receive(object, sizeof(T), recvd_size, priority); } catch(interprocess_exception &ex) { - HALO_LOG(HALO_FATAL, "boost::interprocess exception: " << ex.what()); + RAY_LOG(RAY_FATAL, "boost::interprocess exception: " << ex.what()); } return true; } diff --git a/src/objstore.cc b/src/objstore.cc index 4448edcf3..e6590a390 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -8,7 +8,7 @@ const size_t ObjStoreService::CHUNK_SIZE = 8 * 1024; // this method needs to be protected by a objstore_lock_ // TODO(rkn): Make sure that we do not in fact need the objstore_lock_. We want multiple deliveries to be able to happen simultaneously. void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { - HALO_LOG(HALO_DEBUG, "Objstore " << objstoreid_ << " is beginning to pull objref " << objref); + RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " is beginning to pull objref " << objref); ObjChunk chunk; ClientContext context; StreamObjToRequest stream_request; @@ -27,7 +27,7 @@ void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { segmentpool_lock_.unlock(); do { if (num_bytes + chunk.data().size() > total_size) { - HALO_LOG(HALO_FATAL, "The reader attempted to stream too many bytes."); + RAY_LOG(RAY_FATAL, "The reader attempted to stream too many bytes."); } std::memcpy(data, chunk.data().c_str(), chunk.data().size()); data += chunk.data().size(); @@ -37,10 +37,10 @@ void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { // finalize object if (num_bytes != total_size) { - HALO_LOG(HALO_FATAL, "Streamed objref " << objref << ", but num_bytes != total_size"); + RAY_LOG(RAY_FATAL, "Streamed objref " << objref << ", but num_bytes != total_size"); } object_ready(objref, chunk.metadata_offset()); - HALO_LOG(HALO_DEBUG, "finished streaming data, objref was " << objref << " and size was " << num_bytes); + RAY_LOG(RAY_DEBUG, "finished streaming data, objref was " << objref << " and size was " << num_bytes); } ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel) @@ -79,10 +79,10 @@ Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliver if (memory_[objref].second == MemoryStatusType::NOT_PRESENT) { } else if (memory_[objref].second == MemoryStatusType::DEALLOCATED) { - HALO_LOG(HALO_FATAL, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); + RAY_LOG(RAY_FATAL, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); } else { - HALO_LOG(HALO_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); + RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); return Status::OK; } memory_[objref].second = MemoryStatusType::PRE_ALLOCED; @@ -115,15 +115,15 @@ Status ObjStoreService::ObjStoreInfo(ServerContext* context, const ObjStoreInfoR } Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToRequest* request, ServerWriter* writer) { - HALO_LOG(HALO_DEBUG, "begin to stream data from object store " << objstoreid_); + RAY_LOG(RAY_DEBUG, "begin to stream data from object store " << objstoreid_); ObjChunk chunk; ObjRef objref = request->objref(); memory_lock_.lock(); if (objref >= memory_.size()) { - HALO_LOG(HALO_FATAL, "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); + RAY_LOG(RAY_FATAL, "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); } if (memory_[objref].second != MemoryStatusType::READY) { - HALO_LOG(HALO_FATAL, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); + RAY_LOG(RAY_FATAL, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); } ObjHandle handle = memory_[objref].first; memory_lock_.unlock(); // TODO(rkn): Make sure we don't still need to hold on to this lock. @@ -136,7 +136,7 @@ Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToReq chunk.set_total_size(size); chunk.set_data(head + i, std::min(CHUNK_SIZE, size - i)); if (!writer->Write(chunk)) { - HALO_LOG(HALO_FATAL, "stream connection prematurely closed") + RAY_LOG(RAY_FATAL, "stream connection prematurely closed") } } return Status::OK; @@ -146,20 +146,20 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq // NotifyAlias assumes that the objstore already holds canonical_objref ObjRef alias_objref = request->alias_objref(); ObjRef canonical_objref = request->canonical_objref(); - HALO_LOG(HALO_DEBUG, "Aliasing objref " << alias_objref << " with objref " << canonical_objref); + RAY_LOG(RAY_DEBUG, "Aliasing objref " << alias_objref << " with objref " << canonical_objref); { std::lock_guard memory_lock(memory_lock_); if (canonical_objref >= memory_.size()) { - HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") + RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") } if (memory_[canonical_objref].second == MemoryStatusType::NOT_READY) { - HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") + RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") } if (memory_[canonical_objref].second == MemoryStatusType::NOT_PRESENT) { - HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") + RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") } if (memory_[canonical_objref].second == MemoryStatusType::DEALLOCATED) { - HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") + RAY_LOG(RAY_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") } if (alias_objref >= memory_.size()) { memory_.resize(alias_objref + 1, std::make_pair(ObjHandle(), MemoryStatusType::NOT_PRESENT)); @@ -176,13 +176,13 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq Status ObjStoreService::DeallocateObject(ServerContext* context, const DeallocateObjectRequest* request, AckReply* reply) { ObjRef canonical_objref = request->canonical_objref(); - HALO_LOG(HALO_REFCOUNT, "Deallocating canonical_objref " << canonical_objref); + RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref); std::lock_guard memory_lock(memory_lock_); if (memory_[canonical_objref].second != MemoryStatusType::READY) { - HALO_LOG(HALO_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but memory_[canonical_objref].second = " << memory_[canonical_objref].second); + RAY_LOG(RAY_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but memory_[canonical_objref].second = " << memory_[canonical_objref].second); } if (canonical_objref >= memory_.size()) { - HALO_LOG(HALO_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); + RAY_LOG(RAY_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); } segmentpool_lock_.lock(); segmentpool_->deallocate(memory_[canonical_objref].first); @@ -208,7 +208,7 @@ void ObjStoreService::process_objstore_request(const ObjRequest request) { } break; default: { - HALO_LOG(HALO_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + RAY_LOG(RAY_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -237,13 +237,13 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { std::lock_guard memory_lock(memory_lock_); std::pair& item = memory_[request.objref]; if (item.second == MemoryStatusType::READY) { - HALO_LOG(HALO_DEBUG, "Responding to GET request: returning objref " << request.objref); + RAY_LOG(RAY_DEBUG, "Responding to GET request: returning objref " << request.objref); send_queues_[request.workerid].send(&item.first); } else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT || item.second == MemoryStatusType::PRE_ALLOCED) { std::lock_guard lock(pull_queue_lock_); pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); } else { - HALO_LOG(HALO_FATAL, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); + RAY_LOG(RAY_FATAL, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); } } break; @@ -252,7 +252,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { } break; default: { - HALO_LOG(HALO_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + RAY_LOG(RAY_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -264,17 +264,17 @@ void ObjStoreService::process_requests() { recv_queue_.receive(&request); switch (request.type) { case ObjRequestType::ALLOC: { - HALO_LOG(HALO_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Allocate object with objref " << request.objref << " and size " << request.size); + RAY_LOG(RAY_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Allocate object with objref " << request.objref << " and size " << request.size); process_worker_request(request); } break; case ObjRequestType::GET: { - HALO_LOG(HALO_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Get object with objref " << request.objref); + RAY_LOG(RAY_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Get object with objref " << request.objref); process_worker_request(request); } break; case ObjRequestType::WORKER_DONE: { - HALO_LOG(HALO_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Finalize object with objref " << request.objref); + RAY_LOG(RAY_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Finalize object with objref " << request.objref); process_worker_request(request); } break; @@ -283,7 +283,7 @@ void ObjStoreService::process_requests() { } break; default: { - HALO_LOG(HALO_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + RAY_LOG(RAY_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -309,9 +309,9 @@ ObjHandle ObjStoreService::alloc(ObjRef objref, size_t size) { ObjHandle handle = segmentpool_->allocate(size); segmentpool_lock_.unlock(); std::lock_guard memory_lock(memory_lock_); - HALO_LOG(HALO_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_); + RAY_LOG(RAY_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_); if (memory_[objref].second != MemoryStatusType::NOT_PRESENT && memory_[objref].second != MemoryStatusType::PRE_ALLOCED) { - HALO_LOG(HALO_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); + RAY_LOG(RAY_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); } memory_[objref].first = handle; memory_[objref].second = MemoryStatusType::NOT_READY; @@ -323,7 +323,7 @@ void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { std::lock_guard memory_lock(memory_lock_); std::pair& item = memory_[objref]; if (item.second != MemoryStatusType::NOT_READY) { - HALO_LOG(HALO_FATAL, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); + RAY_LOG(RAY_FATAL, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); } item.first.set_metadata_offset(metadata_offset); item.second = MemoryStatusType::READY; @@ -341,14 +341,14 @@ void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { void ObjStoreService::start_objstore_service() { communicator_thread_ = std::thread([this]() { - HALO_LOG(HALO_INFO, "started object store communicator server"); + RAY_LOG(RAY_INFO, "started object store communicator server"); process_requests(); }); } void start_objstore(const char* scheduler_addr, const char* objstore_addr) { auto scheduler_channel = grpc::CreateChannel(scheduler_addr, grpc::InsecureChannelCredentials()); - HALO_LOG(HALO_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); + RAY_LOG(RAY_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); std::string objstore_address(objstore_addr); ObjStoreService service(objstore_address, scheduler_channel); service.start_objstore_service(); @@ -365,7 +365,7 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) { int main(int argc, char** argv) { if (argc != 3) { - HALO_LOG(HALO_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)"); + RAY_LOG(RAY_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)"); return 1; } diff --git a/src/objstore.h b/src/objstore.h index ccc3076a6..bb7ce4a47 100644 --- a/src/objstore.h +++ b/src/objstore.h @@ -1,5 +1,5 @@ -#ifndef HALO_OBJSTORE_H -#define HALO_OBJSTORE_H +#ifndef RAY_OBJSTORE_H +#define RAY_OBJSTORE_H #include #include @@ -7,8 +7,8 @@ #include #include -#include "halo/halo.h" -#include "halo.grpc.pb.h" +#include "ray/ray.h" +#include "ray.grpc.pb.h" #include "types.pb.h" #include "ipc.h" diff --git a/src/halolib.cc b/src/raylib.cc similarity index 95% rename from src/halolib.cc rename to src/raylib.cc index c48e36c0f..39aaee13d 100644 --- a/src/halolib.cc +++ b/src/raylib.cc @@ -45,7 +45,7 @@ static int PyObjRef_init(PyObjRef *self, PyObject *args, PyObject *kwds) { } std::vector objrefs; objrefs.push_back(self->val); - HALO_LOG(HALO_REFCOUNT, "In PyObjRef_init, calling increment_reference_count for objref " << objrefs[0]); + RAY_LOG(RAY_REFCOUNT, "In PyObjRef_init, calling increment_reference_count for objref " << objrefs[0]); self->worker->increment_reference_count(objrefs); return 0; }; @@ -70,7 +70,7 @@ static PyMemberDef PyObjRef_members[] = { static PyTypeObject PyObjRefType = { PyObject_HEAD_INIT(NULL) 0, /* ob_size */ - "halo.ObjRef", /* tp_name */ + "ray.ObjRef", /* tp_name */ sizeof(PyObjRef), /* tp_basicsize */ 0, /* tp_itemsize */ (destructor)PyObjRef_dealloc, /* tp_dealloc */ @@ -89,7 +89,7 @@ static PyTypeObject PyObjRefType = { 0, /* tp_setattro */ 0, /* tp_as_buffer */ Py_TPFLAGS_DEFAULT, /* tp_flags */ - "Halo objects", /* tp_doc */ + "Ray objects", /* tp_doc */ 0, /* tp_traverse */ 0, /* tp_clear */ 0, /* tp_richcompare */ @@ -119,7 +119,7 @@ PyObject* make_pyobjref(PyObject* worker_capsule, ObjRef objref) { // Error handling -static PyObject *HaloError; +static PyObject *RayError; // Pass arguments from Python to C++ @@ -312,12 +312,12 @@ int serialize(PyObject* worker_capsule, PyObject* val, Obj* obj, std::vector 0) { @@ -423,7 +423,7 @@ PyObject* deserialize(PyObject* worker_capsule, const Obj& obj, std::vector 0) { @@ -434,12 +434,12 @@ PyObject* deserialize(PyObject* worker_capsule, const Obj& obj, std::vector 0) { - HALO_LOG(HALO_REFCOUNT, "In serialize_task, calling increment_reference_count for contained objrefs"); + RAY_LOG(RAY_REFCOUNT, "In serialize_task, calling increment_reference_count for contained objrefs"); worker->increment_reference_count(objrefs); } return PyCapsule_New(static_cast(task), "task", &TaskCapsule_Destructor); @@ -584,7 +584,7 @@ PyObject* deserialize_task(PyObject* self, PyObject* args) { return t; } -// Halo Python API +// Ray Python API PyObject* create_worker(PyObject* self, PyObject* args) { const char* scheduler_addr; @@ -692,7 +692,7 @@ PyObject* put_object(PyObject* self, PyObject* args) { return NULL; } if (!PyList_Check(contained_objrefs)) { - HALO_LOG(HALO_FATAL, "The contained_objrefs argument must be a list.") + RAY_LOG(RAY_FATAL, "The contained_objrefs argument must be a list.") } std::vector vec_contained_objrefs; size_t size = PyList_Size(contained_objrefs); @@ -773,7 +773,7 @@ PyObject* scheduler_info(PyObject* self, PyObject* args) { return dict; } -static PyMethodDef HaloLibMethods[] = { +static PyMethodDef RayLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, { "put_arrow", put_arrow, METH_VARARGS, "put an arrow array on the local object store"}, @@ -798,18 +798,18 @@ static PyMethodDef HaloLibMethods[] = { { NULL, NULL, 0, NULL } }; -PyMODINIT_FUNC initlibhalolib(void) { +PyMODINIT_FUNC initlibraylib(void) { PyObject* m; PyObjRefType.tp_new = PyType_GenericNew; if (PyType_Ready(&PyObjRefType) < 0) { return; } - m = Py_InitModule3("libhalolib", HaloLibMethods, "Python C Extension for Halo"); + m = Py_InitModule3("libraylib", RayLibMethods, "Python C Extension for Ray"); Py_INCREF(&PyObjRefType); PyModule_AddObject(m, "ObjRef", (PyObject *)&PyObjRefType); - HaloError = PyErr_NewException("halo.error", NULL, NULL); - Py_INCREF(HaloError); - PyModule_AddObject(m, "error", HaloError); + RayError = PyErr_NewException("ray.error", NULL, NULL); + Py_INCREF(RayError); + PyModule_AddObject(m, "error", RayError); import_array(); } diff --git a/src/scheduler.cc b/src/scheduler.cc index b06421654..78f970e5a 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -14,7 +14,7 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ if (fntable_.find(task->name()) == fntable_.end()) { // TODO(rkn): In the future, this should probably not be fatal. Instead, propagate the error back to the worker. - HALO_LOG(HALO_FATAL, "The function " << task->name() << " has not been registered by any worker."); + RAY_LOG(RAY_FATAL, "The function " << task->name() << " has not been registered by any worker."); } size_t num_return_vals = fntable_[task->name()].num_return_vals(); @@ -29,8 +29,8 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ } { std::lock_guard reference_counts_lock(reference_counts_lock_); // we grab this lock because increment_ref_count assumes it has been acquired - increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before we reply to the worker that called SubmitTask. The corresponding decrement will happen in submit_task in halolib. - increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before the task is scheduled on the worker. The corresponding decrement will happen in deserialize_task in halolib. + increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before we reply to the worker that called SubmitTask. The corresponding decrement will happen in submit_task in raylib. + increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before the task is scheduled on the worker. The corresponding decrement will happen in deserialize_task in raylib. } auto operation = std::unique_ptr(new Operation()); @@ -64,7 +64,7 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ ObjRef objref = request->objref(); if (objref >= size) { - HALO_LOG(HALO_FATAL, "internal error: no object with objref " << objref << " exists"); + RAY_LOG(RAY_FATAL, "internal error: no object with objref " << objref << " exists"); } pull_queue_lock_.lock(); @@ -77,23 +77,23 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) { ObjRef alias_objref = request->alias_objref(); ObjRef target_objref = request->target_objref(); - HALO_LOG(HALO_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); + RAY_LOG(RAY_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); if (alias_objref == target_objref) { - HALO_LOG(HALO_FATAL, "internal error: attempting to alias objref " << alias_objref << " with itself."); + RAY_LOG(RAY_FATAL, "internal error: attempting to alias objref " << alias_objref << " with itself."); } objtable_lock_.lock(); size_t size = objtable_.size(); objtable_lock_.unlock(); if (alias_objref >= size) { - HALO_LOG(HALO_FATAL, "internal error: no object with objref " << alias_objref << " exists"); + RAY_LOG(RAY_FATAL, "internal error: no object with objref " << alias_objref << " exists"); } if (target_objref >= size) { - HALO_LOG(HALO_FATAL, "internal error: no object with objref " << target_objref << " exists"); + RAY_LOG(RAY_FATAL, "internal error: no object with objref " << target_objref << " exists"); } { std::lock_guard target_objrefs_lock(target_objrefs_lock_); if (target_objrefs_[alias_objref] != UNITIALIZED_ALIAS) { - HALO_LOG(HALO_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); + RAY_LOG(RAY_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); } target_objrefs_[alias_objref] = target_objref; } @@ -121,7 +121,7 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo std::pair info = register_worker(request->worker_address(), request->objstore_address()); WorkerId workerid = info.first; ObjStoreId objstoreid = info.second; - HALO_LOG(HALO_INFO, "registered worker with workerid " << workerid); + RAY_LOG(RAY_INFO, "registered worker with workerid " << workerid); reply->set_workerid(workerid); reply->set_objstoreid(objstoreid); schedule(); @@ -129,7 +129,7 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo } Status SchedulerService::RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) { - HALO_LOG(HALO_INFO, "register function " << request->fnname() << " from workerid " << request->workerid()); + RAY_LOG(RAY_INFO, "register function " << request->fnname() << " from workerid " << request->workerid()); register_function(request->fnname(), request->workerid(), request->num_return_vals()); schedule(); return Status::OK; @@ -137,7 +137,7 @@ Status SchedulerService::RegisterFunction(ServerContext* context, const Register Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* request, AckReply* reply) { ObjRef objref = request->objref(); - HALO_LOG(HALO_DEBUG, "object " << objref << " ready on store " << request->objstoreid()); + RAY_LOG(RAY_DEBUG, "object " << objref << " ready on store " << request->objstoreid()); add_canonical_objref(objref); add_location(objref, request->objstoreid()); schedule(); @@ -145,7 +145,7 @@ Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* } Status SchedulerService::WorkerReady(ServerContext* context, const WorkerReadyRequest* request, AckReply* reply) { - HALO_LOG(HALO_INFO, "worker " << request->workerid() << " reported back"); + RAY_LOG(RAY_INFO, "worker " << request->workerid() << " reported back"); { std::lock_guard lock(avail_workers_lock_); avail_workers_.push_back(request->workerid()); @@ -157,7 +157,7 @@ Status SchedulerService::WorkerReady(ServerContext* context, const WorkerReadyRe Status SchedulerService::IncrementRefCount(ServerContext* context, const IncrementRefCountRequest* request, AckReply* reply) { int num_objrefs = request->objref_size(); if (num_objrefs == 0) { - HALO_LOG(HALO_FATAL, "Scheduler received IncrementRefCountRequest with 0 objrefs."); + RAY_LOG(RAY_FATAL, "Scheduler received IncrementRefCountRequest with 0 objrefs."); } std::vector objrefs; for (int i = 0; i < num_objrefs; ++i) { @@ -171,7 +171,7 @@ Status SchedulerService::IncrementRefCount(ServerContext* context, const Increme Status SchedulerService::DecrementRefCount(ServerContext* context, const DecrementRefCountRequest* request, AckReply* reply) { int num_objrefs = request->objref_size(); if (num_objrefs == 0) { - HALO_LOG(HALO_FATAL, "Scheduler received DecrementRefCountRequest with 0 objrefs."); + RAY_LOG(RAY_FATAL, "Scheduler received DecrementRefCountRequest with 0 objrefs."); } std::vector objrefs; for (int i = 0; i < num_objrefs; ++i) { @@ -186,11 +186,11 @@ Status SchedulerService::AddContainedObjRefs(ServerContext* context, const AddCo ObjRef objref = request->objref(); // if (!is_canonical(objref)) { // TODO(rkn): Perhaps we don't need this check. It won't work because the objstore may not have called ObjReady yet. - // HALO_LOG(HALO_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref); + // RAY_LOG(RAY_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref); // } std::lock_guard contained_objrefs_lock(contained_objrefs_lock_); if (contained_objrefs_[objref].size() != 0) { - HALO_LOG(HALO_FATAL, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); + RAY_LOG(RAY_FATAL, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); } for (int i = 0; i < request->contained_objref_size(); ++i) { contained_objrefs_[objref].push_back(request->contained_objref(i)); @@ -212,10 +212,10 @@ Status SchedulerService::SchedulerInfo(ServerContext* context, const SchedulerIn // deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) { if (from == to) { - HALO_LOG(HALO_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); + RAY_LOG(RAY_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); } if (!has_canonical_objref(objref)) { - HALO_LOG(HALO_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); + RAY_LOG(RAY_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); } ClientContext context; AckReply reply; @@ -235,7 +235,7 @@ void SchedulerService::schedule() { } else if (scheduling_algorithm_ == SCHEDULING_ALGORITHM_LOCALITY_AWARE) { schedule_tasks_location_aware(); // See what we can do in task_queue_ } else { - HALO_LOG(HALO_FATAL, "scheduling algorithm not known"); + RAY_LOG(RAY_FATAL, "scheduling algorithm not known"); } perform_notify_aliases(); // See what we can do in alias_notification_queue_ } @@ -247,7 +247,7 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { ClientContext context; ExecuteTaskRequest request; ExecuteTaskReply reply; - HALO_LOG(HALO_INFO, "starting to send arguments"); + RAY_LOG(RAY_INFO, "starting to send arguments"); for (size_t i = 0; i < task.arg_size(); ++i) { if (!task.arg(i).has_obj()) { ObjRef objref = task.arg(i).ref(); @@ -259,7 +259,7 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { } attempt_notify_alias(get_store(workerid), objref, canonical_objref); - HALO_LOG(HALO_DEBUG, "task contains object ref " << canonical_objref); + RAY_LOG(RAY_DEBUG, "task contains object ref " << canonical_objref); std::lock_guard objtable_lock(objtable_lock_); auto &objstores = objtable_[canonical_objref]; std::lock_guard workers_lock(workers_lock_); @@ -290,7 +290,7 @@ bool SchedulerService::can_run(const Task& task) { } std::pair SchedulerService::register_worker(const std::string& worker_address, const std::string& objstore_address) { - HALO_LOG(HALO_INFO, "registering worker " << worker_address << " connected to object store " << objstore_address); + RAY_LOG(RAY_INFO, "registering worker " << worker_address << " connected to object store " << objstore_address); ObjStoreId objstoreid = std::numeric_limits::max(); for (int num_attempts = 0; num_attempts < 5; ++num_attempts) { std::lock_guard lock(objstores_lock_); @@ -304,7 +304,7 @@ std::pair SchedulerService::register_worker(const std::str } } if (objstoreid == std::numeric_limits::max()) { - HALO_LOG(HALO_FATAL, "object store with address " << objstore_address << " not yet registered"); + RAY_LOG(RAY_FATAL, "object store with address " << objstore_address << " not yet registered"); } workers_lock_.lock(); WorkerId workerid = workers_.size(); @@ -334,16 +334,16 @@ ObjRef SchedulerService::register_new_object() { ObjRef reference_counts_size = reference_counts_.size(); ObjRef contained_objrefs_size = contained_objrefs_.size(); if (objtable_size != target_objrefs_size) { - HALO_LOG(HALO_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); + RAY_LOG(RAY_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); } if (objtable_size != reverse_target_objrefs_size) { - HALO_LOG(HALO_FATAL, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); + RAY_LOG(RAY_FATAL, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); } if (objtable_size != reference_counts_size) { - HALO_LOG(HALO_FATAL, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); + RAY_LOG(RAY_FATAL, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); } if (objtable_size != contained_objrefs_size) { - HALO_LOG(HALO_FATAL, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); + RAY_LOG(RAY_FATAL, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); } objtable_.push_back(std::vector()); target_objrefs_.push_back(UNITIALIZED_ALIAS); @@ -356,11 +356,11 @@ ObjRef SchedulerService::register_new_object() { void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) { // add_location must be called with a canonical objref if (!is_canonical(canonical_objref)) { - HALO_LOG(HALO_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); + RAY_LOG(RAY_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); } std::lock_guard objtable_lock(objtable_lock_); if (canonical_objref >= objtable_.size()) { - HALO_LOG(HALO_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); + RAY_LOG(RAY_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); } // do a binary search auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid); @@ -372,10 +372,10 @@ void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstore void SchedulerService::add_canonical_objref(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); if (objref >= target_objrefs_.size()) { - HALO_LOG(HALO_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); + RAY_LOG(RAY_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); } if (target_objrefs_[objref] != UNITIALIZED_ALIAS && target_objrefs_[objref] != objref) { - HALO_LOG(HALO_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); + RAY_LOG(RAY_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); } target_objrefs_[objref] = objref; } @@ -433,7 +433,7 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { std::mt19937 rng; if (!is_canonical(canonical_objref)) { - HALO_LOG(HALO_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); + RAY_LOG(RAY_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); } std::uniform_int_distribution uni(0, objtable_[canonical_objref].size() - 1); ObjStoreId objstoreid = objtable_[canonical_objref][uni(rng)]; @@ -443,7 +443,7 @@ ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { bool SchedulerService::is_canonical(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); if (target_objrefs_[objref] == UNITIALIZED_ALIAS) { - HALO_LOG(HALO_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); + RAY_LOG(RAY_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); } return objref == target_objrefs_[objref]; } @@ -456,11 +456,11 @@ void SchedulerService::perform_pulls() { ObjRef objref = pull.second; WorkerId workerid = pull.first; if (!has_canonical_objref(objref)) { - HALO_LOG(HALO_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing"); + RAY_LOG(RAY_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing"); continue; } ObjRef canonical_objref = get_canonical_objref(objref); - HALO_LOG(HALO_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); + RAY_LOG(RAY_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); objtable_lock_.lock(); int num_stores = objtable_[canonical_objref].size(); @@ -538,7 +538,7 @@ void SchedulerService::schedule_tasks_location_aware() { if (!task.arg(j).has_obj()) { ObjRef objref = task.arg(j).ref(); if (!has_canonical_objref(objref)) { - HALO_LOG(HALO_FATAL, "no canonical object ref found even though task is ready; that should not be possible!"); + RAY_LOG(RAY_FATAL, "no canonical object ref found even though task is ready; that should not be possible!"); } ObjRef canonical_objref = get_canonical_objref(objref); // check if the object is already in the local object store @@ -585,7 +585,7 @@ bool SchedulerService::has_canonical_objref(ObjRef objref) { ObjRef objref_temp = objref; while (true) { if (objref_temp >= target_objrefs_.size()) { - HALO_LOG(HALO_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + RAY_LOG(RAY_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); } if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { return false; @@ -603,16 +603,16 @@ ObjRef SchedulerService::get_canonical_objref(ObjRef objref) { ObjRef objref_temp = objref; while (true) { if (objref_temp >= target_objrefs_.size()) { - HALO_LOG(HALO_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + RAY_LOG(RAY_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); } if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { - HALO_LOG(HALO_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); + RAY_LOG(RAY_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); } if (target_objrefs_[objref_temp] == objref_temp) { return objref_temp; } objref_temp = target_objrefs_[objref_temp]; - HALO_LOG(HALO_ALIAS, "Looping in get_canonical_objref."); + RAY_LOG(RAY_ALIAS, "Looping in get_canonical_objref."); } } @@ -646,7 +646,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) { // these methods require reference_counts_lock_ to have been acquired, and // so the lock must before outside of these methods (it is acquired in // DecrementRefCount). - HALO_LOG(HALO_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << "."); + RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << "."); { std::lock_guard objtable_lock(objtable_lock_); auto &objstores = objtable_[canonical_objref]; @@ -657,7 +657,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) { DeallocateObjectRequest request; request.set_canonical_objref(canonical_objref); ObjStoreId objstoreid = objstores[i]; - HALO_LOG(HALO_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid); + RAY_LOG(RAY_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid); objstores_[objstoreid].objstore_stub->DeallocateObject(&context, request, &reply); } objtable_[canonical_objref].clear(); @@ -670,10 +670,10 @@ void SchedulerService::increment_ref_count(std::vector &objrefs) { for (int i = 0; i < objrefs.size(); ++i) { ObjRef objref = objrefs[i]; if (reference_counts_[objref] == DEALLOCATED) { - HALO_LOG(HALO_FATAL, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); + RAY_LOG(RAY_FATAL, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); } reference_counts_[objref] += 1; - HALO_LOG(HALO_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << reference_counts_[objref]); + RAY_LOG(RAY_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << reference_counts_[objref]); } } @@ -682,13 +682,13 @@ void SchedulerService::decrement_ref_count(std::vector &objrefs) { for (int i = 0; i < objrefs.size(); ++i) { ObjRef objref = objrefs[i]; if (reference_counts_[objref] == DEALLOCATED) { - HALO_LOG(HALO_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); + RAY_LOG(RAY_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); } if (reference_counts_[objref] == 0) { - HALO_LOG(HALO_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); + RAY_LOG(RAY_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); } reference_counts_[objref] -= 1; - HALO_LOG(HALO_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << reference_counts_[objref]); + RAY_LOG(RAY_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << reference_counts_[objref]); // See if we can deallocate the object std::vector equivalent_objrefs; get_equivalent_objrefs(objref, equivalent_objrefs); @@ -702,7 +702,7 @@ void SchedulerService::decrement_ref_count(std::vector &objrefs) { if (can_deallocate) { ObjRef canonical_objref = equivalent_objrefs[0]; if (!is_canonical(canonical_objref)) { - HALO_LOG(HALO_FATAL, "canonical_objref is not canonical."); + RAY_LOG(RAY_FATAL, "canonical_objref is not canonical."); } deallocate_object(canonical_objref); for (int j = 0; j < equivalent_objrefs.size(); ++j) { @@ -724,7 +724,7 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector std::lock_guard target_objrefs_lock(target_objrefs_lock_); ObjRef downstream_objref = objref; while (target_objrefs_[downstream_objref] != downstream_objref && target_objrefs_[downstream_objref] != UNITIALIZED_ALIAS) { - HALO_LOG(HALO_ALIAS, "Looping in get_equivalent_objrefs"); + RAY_LOG(RAY_ALIAS, "Looping in get_equivalent_objrefs"); downstream_objref = target_objrefs_[downstream_objref]; } std::lock_guard reverse_target_objrefs_lock(reverse_target_objrefs_lock_); @@ -755,7 +755,7 @@ char* get_cmd_option(char** begin, char** end, const std::string& option) { int main(int argc, char** argv) { SchedulingAlgorithmType scheduling_algorithm = SCHEDULING_ALGORITHM_LOCALITY_AWARE; if (argc < 2) { - HALO_LOG(HALO_FATAL, "scheduler: expected at least one argument (scheduler ip address)"); + RAY_LOG(RAY_FATAL, "scheduler: expected at least one argument (scheduler ip address)"); return 1; } if (argc > 2) { diff --git a/src/scheduler.h b/src/scheduler.h index 326220250..0692fdc2b 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,5 +1,5 @@ -#ifndef HALO_SCHEDULER_H -#define HALO_SCHEDULER_H +#ifndef RAY_SCHEDULER_H +#define RAY_SCHEDULER_H #include @@ -10,8 +10,8 @@ #include -#include "halo/halo.h" -#include "halo.grpc.pb.h" +#include "ray/ray.h" +#include "ray.grpc.pb.h" #include "types.pb.h" #include "computation_graph.h" diff --git a/src/utils.h b/src/utils.h index 218c3c355..cf907b434 100644 --- a/src/utils.h +++ b/src/utils.h @@ -1,5 +1,5 @@ -#ifndef HALO_UTILS_H -#define HALO_UTILS_H +#ifndef RAY_UTILS_H +#define RAY_UTILS_H inline std::string::iterator split_ip_address(std::string& ip_address) { if (ip_address[0] == '[') { // IPv6 @@ -10,11 +10,11 @@ inline std::string::iterator split_ip_address(std::string& ip_address) { if(split_end != ip_address.end() && *split_end == ':') { return split_end; } - HALO_LOG(HALO_FATAL, "ip address should contain a port number"); + RAY_LOG(RAY_FATAL, "ip address should contain a port number"); } else { // IPv4 auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base(); if (split_point == ip_address.begin()) { - HALO_LOG(HALO_FATAL, "ip address should contain a port number"); + RAY_LOG(RAY_FATAL, "ip address should contain a port number"); } else { return split_point; } diff --git a/src/worker.cc b/src/worker.cc index 125a908a2..874318bec 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -5,12 +5,12 @@ #include extern "C" { - static PyObject *HaloError; + static PyObject *RayError; } Status WorkerServiceImpl::ExecuteTask(ServerContext* context, const ExecuteTaskRequest* request, ExecuteTaskReply* reply) { task_ = request->task(); // Copy task - HALO_LOG(HALO_INFO, "invoked task " << request->task().name()); + RAY_LOG(RAY_INFO, "invoked task " << request->task().name()); Task* taskptr = &task_; send_queue_.send(&taskptr); return Status::OK; @@ -26,7 +26,7 @@ Worker::Worker(const std::string& worker_address, std::shared_ptr sched SubmitTaskReply Worker::submit_task(SubmitTaskRequest* request) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform submit_task, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform submit_task, but connected_ = " << connected_ << "."); } SubmitTaskReply reply; ClientContext context; @@ -52,7 +52,7 @@ void Worker::register_worker(const std::string& worker_address, const std::strin void Worker::request_object(ObjRef objref) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform request_object, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform request_object, but connected_ = " << connected_ << "."); } RequestObjRequest request; request.set_workerid(workerid_); @@ -66,7 +66,7 @@ void Worker::request_object(ObjRef objref) { ObjRef Worker::get_objref() { // first get objref for the new object if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform get_objref, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform get_objref, but connected_ = " << connected_ << "."); } PushObjRequest push_request; PushObjReply push_reply; @@ -78,7 +78,7 @@ ObjRef Worker::get_objref() { slice Worker::get_object(ObjRef objref) { // get_object assumes that objref is a canonical objref if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform get_object, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform get_object, but connected_ = " << connected_ << "."); } ObjRequest request; request.workerid = workerid_; @@ -97,7 +97,7 @@ slice Worker::get_object(ObjRef objref) { // contained_objrefs is a vector of all the objrefs contained in obj void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &contained_objrefs) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform put_object, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform put_object, but connected_ = " << connected_ << "."); } std::string data; obj->SerializeToString(&data); // TODO(pcm): get rid of this serialization @@ -108,7 +108,7 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont request.size = data.size(); request_obj_queue_.send(&request); if (contained_objrefs.size() > 0) { - HALO_LOG(HALO_REFCOUNT, "In put_object, calling increment_reference_count for contained objrefs"); + RAY_LOG(RAY_REFCOUNT, "In put_object, calling increment_reference_count for contained objrefs"); increment_reference_count(contained_objrefs); // Notify the scheduler that some object references are serialized in the objstore. } ObjHandle result; @@ -135,14 +135,14 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont arrow::Status _s = (s); \ if (!_s.ok()) { \ std::string _errmsg = std::string(msg) + _s.ToString(); \ - PyErr_SetString(HaloError, _errmsg.c_str()); \ + PyErr_SetString(RayError, _errmsg.c_str()); \ return NULL; \ } \ } while (0); PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform put_arrow, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform put_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; pynumbuf::PythonObjectWriter writer; @@ -168,7 +168,7 @@ PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { PyObject* Worker::get_arrow(ObjRef objref) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform get_arrow, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform get_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; request.workerid = workerid_; @@ -186,7 +186,7 @@ PyObject* Worker::get_arrow(ObjRef objref) { bool Worker::is_arrow(ObjRef objref) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform is_arrow, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform is_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; request.workerid = workerid_; @@ -200,7 +200,7 @@ bool Worker::is_arrow(ObjRef objref) { void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform alias_objrefs, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform alias_objrefs, but connected_ = " << connected_ << "."); } ClientContext context; AliasObjRefsRequest request; @@ -212,14 +212,14 @@ void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { void Worker::increment_reference_count(std::vector &objrefs) { if (!connected_) { - HALO_LOG(HALO_DEBUG, "Attempting to increment_reference_count for objrefs, but connected_ = " << connected_ << " so returning instead."); + RAY_LOG(RAY_DEBUG, "Attempting to increment_reference_count for objrefs, but connected_ = " << connected_ << " so returning instead."); return; } if (objrefs.size() > 0) { ClientContext context; IncrementRefCountRequest request; for (int i = 0; i < objrefs.size(); ++i) { - HALO_LOG(HALO_REFCOUNT, "Incrementing reference count for objref " << objrefs[i]); + RAY_LOG(RAY_REFCOUNT, "Incrementing reference count for objref " << objrefs[i]); request.add_objref(objrefs[i]); } AckReply reply; @@ -229,14 +229,14 @@ void Worker::increment_reference_count(std::vector &objrefs) { void Worker::decrement_reference_count(std::vector &objrefs) { if (!connected_) { - HALO_LOG(HALO_DEBUG, "Attempting to decrement_reference_count, but connected_ = " << connected_ << " so returning instead."); + RAY_LOG(RAY_DEBUG, "Attempting to decrement_reference_count, but connected_ = " << connected_ << " so returning instead."); return; } if (objrefs.size() > 0) { ClientContext context; DecrementRefCountRequest request; for (int i = 0; i < objrefs.size(); ++i) { - HALO_LOG(HALO_REFCOUNT, "Decrementing reference count for objref " << objrefs[i]); + RAY_LOG(RAY_REFCOUNT, "Decrementing reference count for objref " << objrefs[i]); request.add_objref(objrefs[i]); } AckReply reply; @@ -246,7 +246,7 @@ void Worker::decrement_reference_count(std::vector &objrefs) { void Worker::register_function(const std::string& name, size_t num_return_vals) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform register_function, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform register_function, but connected_ = " << connected_ << "."); } ClientContext context; RegisterFunctionRequest request; @@ -265,7 +265,7 @@ Task* Worker::receive_next_task() { void Worker::notify_task_completed() { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to perform notify_task_completed, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to perform notify_task_completed, but connected_ = " << connected_ << "."); } ClientContext context; WorkerReadyRequest request; @@ -285,7 +285,7 @@ bool Worker::connected() { // TODO(rkn): Should we be using pointers or references? And should they be const? void Worker::scheduler_info(ClientContext &context, SchedulerInfoRequest &request, SchedulerInfoReply &reply) { if (!connected_) { - HALO_LOG(HALO_FATAL, "Attempting to get scheduler info, but connected_ = " << connected_ << "."); + RAY_LOG(RAY_FATAL, "Attempting to get scheduler info, but connected_ = " << connected_ << "."); } scheduler_stub_->SchedulerInfo(&context, request, &reply); } @@ -306,7 +306,7 @@ void Worker::start_worker_service() { builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - HALO_LOG(HALO_INFO, "worker server listening on " << service_address); + RAY_LOG(RAY_INFO, "worker server listening on " << service_address); server->Wait(); }); } diff --git a/src/worker.h b/src/worker.h index f827cbe22..e0a9374f6 100644 --- a/src/worker.h +++ b/src/worker.h @@ -1,5 +1,5 @@ -#ifndef HALO_WORKER_H -#define HALO_WORKER_H +#ifndef RAY_WORKER_H +#define RAY_WORKER_H #include #include @@ -15,8 +15,8 @@ using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; -#include "halo.grpc.pb.h" -#include "halo/halo.h" +#include "ray.grpc.pb.h" +#include "ray/ray.h" #include "ipc.h" using grpc::Channel; diff --git a/test/arrays_test.py b/test/arrays_test.py index e395ae951..695c1672b 100644 --- a/test/arrays_test.py +++ b/test/arrays_test.py @@ -1,15 +1,15 @@ import unittest -import halo -import halo.serialization as serialization -import halo.services as services -import halo.worker as worker +import ray +import ray.serialization as serialization +import ray.services as services +import ray.worker as worker import numpy as np import time import subprocess32 as subprocess import os -import halo.arrays.remote as ra -import halo.arrays.distributed as da +import ray.arrays.remote as ra +import ray.arrays.distributed as da class ArraysSingleTest(unittest.TestCase): @@ -20,27 +20,27 @@ class ArraysSingleTest(unittest.TestCase): # test eye ref = ra.eye(3) - val = halo.pull(ref) + val = ray.pull(ref) self.assertTrue(np.alltrue(val == np.eye(3))) # test zeros ref = ra.zeros([3, 4, 5]) - val = halo.pull(ref) + val = ray.pull(ref) self.assertTrue(np.alltrue(val == np.zeros([3, 4, 5]))) # test qr - pass by value val_a = np.random.normal(size=[10, 13]) ref_q, ref_r = ra.linalg.qr(val_a) - val_q = halo.pull(ref_q) - val_r = halo.pull(ref_r) + val_q = ray.pull(ref_q) + val_r = ray.pull(ref_r) self.assertTrue(np.allclose(np.dot(val_q, val_r), val_a)) # test qr - pass by objref a = ra.random.normal([10, 13]) ref_q, ref_r = ra.linalg.qr(a) - val_a = halo.pull(a) - val_q = halo.pull(ref_q) - val_r = halo.pull(ref_r) + val_a = ray.pull(a) + val_q = ray.pull(ref_q) + val_r = ray.pull(ref_r) self.assertTrue(np.allclose(np.dot(val_q, val_r), val_a)) services.cleanup() @@ -51,7 +51,7 @@ class ArraysDistTest(unittest.TestCase): [w] = services.start_singlenode_cluster(return_drivers=True) x = da.DistArray() - x.construct([2, 3, 4], np.array([[[halo.push(0, w)]]])) + x.construct([2, 3, 4], np.array([[[ray.push(0, w)]]])) capsule, _ = serialization.serialize(w.handle, x) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE y = serialization.deserialize(w.handle, capsule) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE self.assertEqual(x.shape, y.shape) @@ -79,33 +79,33 @@ class ArraysDistTest(unittest.TestCase): x = da.zeros([9, 25, 51], "float") y = da.assemble(x) - self.assertTrue(np.alltrue(halo.pull(y) == np.zeros([9, 25, 51]))) + self.assertTrue(np.alltrue(ray.pull(y) == np.zeros([9, 25, 51]))) x = da.ones([11, 25, 49], dtype_name="float") y = da.assemble(x) - self.assertTrue(np.alltrue(halo.pull(y) == np.ones([11, 25, 49]))) + self.assertTrue(np.alltrue(ray.pull(y) == np.ones([11, 25, 49]))) x = da.random.normal([11, 25, 49]) y = da.copy(x) z = da.assemble(x) w = da.assemble(y) - self.assertTrue(np.alltrue(halo.pull(z) == halo.pull(w))) + self.assertTrue(np.alltrue(ray.pull(z) == ray.pull(w))) x = da.eye(25, dtype_name="float") y = da.assemble(x) - self.assertTrue(np.alltrue(halo.pull(y) == np.eye(25))) + self.assertTrue(np.alltrue(ray.pull(y) == np.eye(25))) x = da.random.normal([25, 49]) y = da.triu(x) z = da.assemble(y) w = da.assemble(x) - self.assertTrue(np.alltrue(halo.pull(z) == np.triu(halo.pull(w)))) + self.assertTrue(np.alltrue(ray.pull(z) == np.triu(ray.pull(w)))) x = da.random.normal([25, 49]) y = da.tril(x) z = da.assemble(y) w = da.assemble(x) - self.assertTrue(np.alltrue(halo.pull(z) == np.tril(halo.pull(w)))) + self.assertTrue(np.alltrue(ray.pull(z) == np.tril(ray.pull(w)))) x = da.random.normal([25, 49]) y = da.random.normal([49, 18]) @@ -113,8 +113,8 @@ class ArraysDistTest(unittest.TestCase): w = da.assemble(z) u = da.assemble(x) v = da.assemble(y) - np.allclose(halo.pull(w), np.dot(halo.pull(u), halo.pull(v))) - self.assertTrue(np.allclose(halo.pull(w), np.dot(halo.pull(u), halo.pull(v)))) + np.allclose(ray.pull(w), np.dot(ray.pull(u), ray.pull(v))) + self.assertTrue(np.allclose(ray.pull(w), np.dot(ray.pull(u), ray.pull(v)))) # test add x = da.random.normal([23, 42]) @@ -123,7 +123,7 @@ class ArraysDistTest(unittest.TestCase): z_full = da.assemble(z) x_full = da.assemble(x) y_full = da.assemble(y) - self.assertTrue(np.allclose(halo.pull(z_full), halo.pull(x_full) + halo.pull(y_full))) + self.assertTrue(np.allclose(ray.pull(z_full), ray.pull(x_full) + ray.pull(y_full))) # test subtract x = da.random.normal([33, 40]) @@ -132,14 +132,14 @@ class ArraysDistTest(unittest.TestCase): z_full = da.assemble(z) x_full = da.assemble(x) y_full = da.assemble(y) - self.assertTrue(np.allclose(halo.pull(z_full), halo.pull(x_full) - halo.pull(y_full))) + self.assertTrue(np.allclose(ray.pull(z_full), ray.pull(x_full) - ray.pull(y_full))) # test transpose x = da.random.normal([234, 432]) y = da.transpose(x) x_full = da.assemble(x) y_full = da.assemble(y) - self.assertTrue(np.alltrue(halo.pull(x_full).T == halo.pull(y_full))) + self.assertTrue(np.alltrue(ray.pull(x_full).T == ray.pull(y_full))) # test numpy_to_dist x = da.random.normal([23, 45]) @@ -148,8 +148,8 @@ class ArraysDistTest(unittest.TestCase): w = da.assemble(z) x_full = da.assemble(x) z_full = da.assemble(z) - self.assertTrue(np.alltrue(halo.pull(x_full) == halo.pull(z_full))) - self.assertTrue(np.alltrue(halo.pull(y) == halo.pull(w))) + self.assertTrue(np.alltrue(ray.pull(x_full) == ray.pull(z_full))) + self.assertTrue(np.alltrue(ray.pull(y) == ray.pull(w))) # test da.tsqr for shape in [[123, da.BLOCK_SIZE], [7, da.BLOCK_SIZE], [da.BLOCK_SIZE, da.BLOCK_SIZE], [da.BLOCK_SIZE, 7], [10 * da.BLOCK_SIZE, da.BLOCK_SIZE]]: @@ -157,10 +157,10 @@ class ArraysDistTest(unittest.TestCase): K = min(shape) q, r = da.linalg.tsqr(x) x_full = da.assemble(x) - x_val = halo.pull(x_full) + x_val = ray.pull(x_full) q_full = da.assemble(q) - q_val = halo.pull(q_full) - r_val = halo.pull(r) + q_val = ray.pull(q_full) + r_val = ray.pull(r) self.assertTrue(r_val.shape == (K, shape[1])) self.assertTrue(np.alltrue(r_val == np.triu(r_val))) self.assertTrue(np.allclose(x_val, np.dot(q_val, r_val))) @@ -174,12 +174,12 @@ class ArraysDistTest(unittest.TestCase): m = ra.random.normal([d1, d2]) q, r = ra.linalg.qr(m) l, u, s = da.linalg.modified_lu(da.numpy_to_dist(q)) - q_val = halo.pull(q) - r_val = halo.pull(r) + q_val = ray.pull(q) + r_val = ray.pull(r) l_full = da.assemble(l) - l_val = halo.pull(l_full) - u_val = halo.pull(u) - s_val = halo.pull(s) + l_val = ray.pull(l_full) + u_val = ray.pull(u) + s_val = ray.pull(s) s_mat = np.zeros((d1, d2)) for i in range(len(s_val)): s_mat[i, i] = s_val[i] @@ -196,17 +196,17 @@ class ArraysDistTest(unittest.TestCase): a = da.random.normal([d1, d2]) y, t, y_top, r = da.linalg.tsqr_hr(a) a_full = da.assemble(a) - a_val = halo.pull(a_full) + a_val = ray.pull(a_full) y_full = da.assemble(y) - y_val = halo.pull(y_full) - t_val = halo.pull(t) - y_top_val = halo.pull(y_top) - r_val = halo.pull(r) + y_val = ray.pull(y_full) + t_val = ray.pull(t) + y_top_val = ray.pull(y_top) + r_val = ray.pull(r) tall_eye = np.zeros((d1, min(d1, d2))) np.fill_diagonal(tall_eye, 1) q = tall_eye - np.dot(y_val, np.dot(t_val, y_top_val.T)) self.assertTrue(np.allclose(np.dot(q.T, q), np.eye(min(d1, d2)))) # check that q.T * q = I - self.assertTrue(np.allclose(np.dot(q, r_val), a_val)) # check that a = (I - y * t * y_thalo.T) * r + self.assertTrue(np.allclose(np.dot(q, r_val), a_val)) # check that a = (I - y * t * y_top.T) * r for d1, d2 in [(123, da.BLOCK_SIZE), (7, da.BLOCK_SIZE), (da.BLOCK_SIZE, da.BLOCK_SIZE), (da.BLOCK_SIZE, 7), (10 * da.BLOCK_SIZE, da.BLOCK_SIZE)]: test_dist_tsqr_hr(d1, d2) @@ -219,9 +219,9 @@ class ArraysDistTest(unittest.TestCase): a_full = da.assemble(a) q_full = da.assemble(q) r_full = da.assemble(r) - a_val = halo.pull(a_full) - q_val = halo.pull(q_full) - r_val = halo.pull(r_full) + a_val = ray.pull(a_full) + q_val = ray.pull(q_full) + r_val = ray.pull(r_full) self.assertTrue(q_val.shape == (d1, K)) self.assertTrue(r_val.shape == (K, d2)) diff --git a/test/gen-python-code.sh b/test/gen-python-code.sh index 8221c6b4b..62d10da6a 100644 --- a/test/gen-python-code.sh +++ b/test/gen-python-code.sh @@ -1,4 +1,4 @@ # For running the python tests -protoc -I ../protos/ --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../protos/halo.proto +protoc -I ../protos/ --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../protos/ray.proto protoc -I ../protos/ --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../protos/types.proto diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index ce144c1ce..0755d9eda 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -1,6 +1,6 @@ import unittest -import halo -import halo.services as services +import ray +import ray.services as services import time import os import numpy as np @@ -51,7 +51,7 @@ class MicroBenchmarkTest(unittest.TestCase): for _ in range(1000): start_time = time.time() x = test_functions.trivial_function() - halo.pull(x) + ray.pull(x) end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) @@ -67,7 +67,7 @@ class MicroBenchmarkTest(unittest.TestCase): elapsed_times = [] for _ in range(1000): start_time = time.time() - halo.push(1) + ray.push(1) end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) diff --git a/test/runtest.py b/test/runtest.py index c412ba014..4be6a907a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1,16 +1,16 @@ import unittest -import halo -import halo.serialization as serialization -import halo.services as services -import halo.worker as worker +import ray +import ray.serialization as serialization +import ray.services as services +import ray.worker as worker import numpy as np import time import subprocess32 as subprocess import os import test_functions -import halo.arrays.remote as ra -import halo.arrays.distributed as da +import ray.arrays.remote as ra +import ray.arrays.distributed as da class SerializationTest(unittest.TestCase): @@ -56,10 +56,10 @@ class SerializationTest(unittest.TestCase): self.numpyTypeTest(w, 'float32') self.numpyTypeTest(w, 'float64') - ref0 = halo.push(0, w) - ref1 = halo.push(0, w) - ref2 = halo.push(0, w) - ref3 = halo.push(0, w) + ref0 = ray.push(0, w) + ref1 = ray.push(0, w) + ref2 = ray.push(0, w) + ref3 = ray.push(0, w) a = np.array([[ref0, ref1], [ref2, ref3]]) capsule, _ = serialization.serialize(w.handle, a) result = serialization.deserialize(w.handle, capsule) @@ -75,8 +75,8 @@ class ObjStoreTest(unittest.TestCase): # pushing and pulling an object shouldn't change it for data in ["h", "h" * 10000, 0, 0.0]: - objref = halo.push(data, w1) - result = halo.pull(objref, w1) + objref = ray.push(data, w1) + result = ray.pull(objref, w1) self.assertEqual(result, data) # pushing an object, shipping it to another worker, and pulling it shouldn't change it @@ -129,7 +129,7 @@ class SchedulerTest(unittest.TestCase): time.sleep(0.2) - value_after = halo.pull(objref[0], w) + value_after = ray.pull(objref[0], w) self.assertEqual(value_before, value_after) time.sleep(0.1) @@ -143,26 +143,26 @@ class WorkerTest(unittest.TestCase): for i in range(100): value_before = i * 10 ** 6 - objref = halo.push(value_before, w) - value_after = halo.pull(objref, w) + objref = ray.push(value_before, w) + value_after = ray.pull(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = i * 10 ** 6 * 1.0 - objref = halo.push(value_before, w) - value_after = halo.pull(objref, w) + objref = ray.push(value_before, w) + value_after = ray.pull(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = "h" * i - objref = halo.push(value_before, w) - value_after = halo.pull(objref, w) + objref = ray.push(value_before, w) + value_after = ray.pull(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = [1] * i - objref = halo.push(value_before, w) - value_after = halo.pull(objref, w) + objref = ray.push(value_before, w) + value_after = ray.pull(objref, w) self.assertEqual(value_before, value_after) services.cleanup() @@ -175,11 +175,11 @@ class APITest(unittest.TestCase): [w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=test_path) objref = w.submit_task("test_functions.test_alias_f", []) - self.assertTrue(np.alltrue(halo.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(ray.pull(objref[0], w) == np.ones([3, 4, 5]))) objref = w.submit_task("test_functions.test_alias_g", []) - self.assertTrue(np.alltrue(halo.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(ray.pull(objref[0], w) == np.ones([3, 4, 5]))) objref = w.submit_task("test_functions.test_alias_h", []) - self.assertTrue(np.alltrue(halo.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(ray.pull(objref[0], w) == np.ones([3, 4, 5]))) services.cleanup() @@ -189,35 +189,35 @@ class APITest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path) x = test_functions.keyword_fct1(1) - self.assertEqual(halo.pull(x), "1 hello") + self.assertEqual(ray.pull(x), "1 hello") x = test_functions.keyword_fct1(1, "hi") - self.assertEqual(halo.pull(x), "1 hi") + self.assertEqual(ray.pull(x), "1 hi") x = test_functions.keyword_fct1(1, b="world") - self.assertEqual(halo.pull(x), "1 world") + self.assertEqual(ray.pull(x), "1 world") x = test_functions.keyword_fct2(a="w", b="hi") - self.assertEqual(halo.pull(x), "w hi") + self.assertEqual(ray.pull(x), "w hi") x = test_functions.keyword_fct2(b="hi", a="w") - self.assertEqual(halo.pull(x), "w hi") + self.assertEqual(ray.pull(x), "w hi") x = test_functions.keyword_fct2(a="w") - self.assertEqual(halo.pull(x), "w world") + self.assertEqual(ray.pull(x), "w world") x = test_functions.keyword_fct2(b="hi") - self.assertEqual(halo.pull(x), "hello hi") + self.assertEqual(ray.pull(x), "hello hi") x = test_functions.keyword_fct2("w") - self.assertEqual(halo.pull(x), "w world") + self.assertEqual(ray.pull(x), "w world") x = test_functions.keyword_fct2("w", "hi") - self.assertEqual(halo.pull(x), "w hi") + self.assertEqual(ray.pull(x), "w hi") x = test_functions.keyword_fct3(0, 1, c="w", d="hi") - self.assertEqual(halo.pull(x), "0 1 w hi") + self.assertEqual(ray.pull(x), "0 1 w hi") x = test_functions.keyword_fct3(0, 1, d="hi", c="w") - self.assertEqual(halo.pull(x), "0 1 w hi") + self.assertEqual(ray.pull(x), "0 1 w hi") x = test_functions.keyword_fct3(0, 1, c="w") - self.assertEqual(halo.pull(x), "0 1 w world") + self.assertEqual(ray.pull(x), "0 1 w world") x = test_functions.keyword_fct3(0, 1, d="hi") - self.assertEqual(halo.pull(x), "0 1 hello hi") + self.assertEqual(ray.pull(x), "0 1 hello hi") x = test_functions.keyword_fct3(0, 1) - self.assertEqual(halo.pull(x), "0 1 hello world") + self.assertEqual(ray.pull(x), "0 1 hello world") services.cleanup() @@ -227,9 +227,9 @@ class APITest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path) x = test_functions.varargs_fct1(0, 1, 2) - self.assertEqual(halo.pull(x), "0 1 2") + self.assertEqual(ray.pull(x), "0 1 2") x = test_functions.varargs_fct2(0, 1, 2) - self.assertEqual(halo.pull(x), "1 2") + self.assertEqual(ray.pull(x), "1 2") self.assertTrue(test_functions.kwargs_exception_thrown) self.assertTrue(test_functions.varargs_and_kwargs_exception_thrown) @@ -244,48 +244,48 @@ class ReferenceCountingTest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path) x = test_functions.test_alias_f() - halo.pull(x) + ray.pull(x) time.sleep(0.1) objref_val = x.val - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val] == 1) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val] == 1) del x - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val] == -1) # -1 indicates deallocated + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val] == -1) # -1 indicates deallocated y = test_functions.test_alias_h() - halo.pull(y) + ray.pull(y) time.sleep(0.1) objref_val = y.val - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 0, 0]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 0, 0]) del y - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) z = da.zeros([da.BLOCK_SIZE, 2 * da.BLOCK_SIZE], "float") time.sleep(0.1) objref_val = z.val - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) del z time.sleep(0.1) - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) x = ra.zeros([10, 10], "float") y = ra.zeros([10, 10], "float") z = ra.dot(x, y) objref_val = x.val time.sleep(0.1) - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) del x time.sleep(0.1) - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, 1, 1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, 1, 1]) del y time.sleep(0.1) - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, 1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, 1]) del z time.sleep(0.1) - self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) + self.assertTrue(ray.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) services.cleanup() diff --git a/test/shell.py b/test/shell.py deleted file mode 100644 index ce194d17a..000000000 --- a/test/shell.py +++ /dev/null @@ -1,28 +0,0 @@ -import argparse -import numpy as np - -import halo -import halo.services as services -import halo.worker as worker - -import test_functions -import halo.arrays.remote as ra -import halo.arrays.distributed as da - -from grpc.beta import implementations -import halo_pb2 -import types_pb2 - -TIMEOUT_SECONDS = 5 - -parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.') -parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") -parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") -parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address") - -if __name__ == '__main__': - args = parser.parse_args() - worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) - - import IPython - IPython.embed() diff --git a/test/test_functions.py b/test/test_functions.py index 47ae396d9..fa3b9558f 100644 --- a/test/test_functions.py +++ b/test/test_functions.py @@ -1,70 +1,70 @@ -import halo +import ray import numpy as np # Test simple functionality -@halo.remote([str], [str]) +@ray.remote([str], [str]) def print_string(string): print "called print_string with", string f = open("asdfasdf.txt", "w") f.write("successfully called print_string with argument {}.".format(string)) return string -@halo.remote([int, int], [int, int]) +@ray.remote([int, int], [int, int]) def handle_int(a, b): return a + 1, b + 1 # Test aliasing -@halo.remote([], [np.ndarray]) +@ray.remote([], [np.ndarray]) def test_alias_f(): return np.ones([3, 4, 5]) -@halo.remote([], [np.ndarray]) +@ray.remote([], [np.ndarray]) def test_alias_g(): return test_alias_f() -@halo.remote([], [np.ndarray]) +@ray.remote([], [np.ndarray]) def test_alias_h(): return test_alias_g() # Test timing -@halo.remote([], []) +@ray.remote([], []) def empty_function(): return () -@halo.remote([], [int]) +@ray.remote([], [int]) def trivial_function(): return 1 # Test keyword arguments -@halo.remote([int, str], [str]) +@ray.remote([int, str], [str]) def keyword_fct1(a, b="hello"): return "{} {}".format(a, b) -@halo.remote([str, str], [str]) +@ray.remote([str, str], [str]) def keyword_fct2(a="hello", b="world"): return "{} {}".format(a, b) -@halo.remote([int, int, str, str], [str]) +@ray.remote([int, int, str, str], [str]) def keyword_fct3(a, b, c="hello", d="world"): return "{} {} {} {}".format(a, b, c, d) # Test variable numbers of arguments -@halo.remote([int], [str]) +@ray.remote([int], [str]) def varargs_fct1(*a): return " ".join(map(str, a)) -@halo.remote([int, int], [str]) +@ray.remote([int, int], [str]) def varargs_fct2(a, *b): return " ".join(map(str, b)) try: - @halo.remote([int], []) + @ray.remote([int], []) def kwargs_throw_exception(**c): return () kwargs_exception_thrown = False @@ -72,7 +72,7 @@ except: kwargs_exception_thrown = True try: - @halo.remote([int, str, int], [str]) + @ray.remote([int, str, int], [str]) def varargs_and_kwargs_throw_exception(a, b="hi", *c): return "{} {} {}".format(a, b, c) varargs_and_kwargs_exception_thrown = False diff --git a/test/test_worker.py b/test/test_worker.py index cff3127cf..51d5b4ef6 100644 --- a/test/test_worker.py +++ b/test/test_worker.py @@ -3,12 +3,12 @@ import argparse import numpy as np import test_functions -import halo.arrays.remote as ra -import halo.arrays.distributed as da +import ray.arrays.remote as ra +import ray.arrays.distributed as da -import halo -import halo.services as services -import halo.worker as worker +import ray +import ray.services as services +import ray.worker as worker parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.') parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") @@ -19,13 +19,13 @@ if __name__ == '__main__': args = parser.parse_args() worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) - halo.register_module(test_functions) - halo.register_module(ra) - halo.register_module(ra.random) - halo.register_module(ra.linalg) - halo.register_module(da) - halo.register_module(da.random) - halo.register_module(da.linalg) - halo.register_module(sys.modules[__name__]) + ray.register_module(test_functions) + ray.register_module(ra) + ray.register_module(ra.random) + ray.register_module(ra.linalg) + ray.register_module(da) + ray.register_module(da.random) + ray.register_module(da.linalg) + ray.register_module(sys.modules[__name__]) worker.main_loop()