diff --git a/CMakeLists.txt b/CMakeLists.txt index d70d5a070..31b74e291 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 2.8) -project(orchestra) +project(halo) set(THIRDPARTY_DIR "${CMAKE_SOURCE_DIR}/thirdparty") @@ -20,15 +20,15 @@ include_directories("${NUMPY_INCLUDE_DIR}") set(PROTO_PATH "${CMAKE_SOURCE_DIR}/protos") -set(ORCHESTRA_PROTO "${PROTO_PATH}/orchestra.proto") +set(HALO_PROTO "${PROTO_PATH}/halo.proto") set(TYPES_PROTO "${PROTO_PATH}/types.proto") set(GENERATED_PROTOBUF_PATH "${CMAKE_BINARY_DIR}/generated") file(MAKE_DIRECTORY ${GENERATED_PROTOBUF_PATH}) -set(ORCHESTRA_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/orchestra.pb.cc") -set(ORCHESTRA_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/orchestra.pb.h") -set(ORCHESTRA_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/orchestra.grpc.pb.cc") -set(ORCHESTRA_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/orchestra.grpc.pb.h") +set(HALO_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/halo.pb.cc") +set(HALO_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/halo.pb.h") +set(HALO_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/halo.grpc.pb.cc") +set(HALO_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/halo.grpc.pb.h") set(TYPES_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.cc") set(TYPES_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.h") @@ -36,19 +36,19 @@ set(TYPES_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.cc") set(TYPES_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.h") add_custom_command( - OUTPUT "${ORCHESTRA_PB_H_FILE}" - "${ORCHESTRA_PB_CPP_FILE}" - "${ORCHESTRA_GRPC_PB_H_FILE}" - "${ORCHESTRA_GRPC_PB_CPP_FILE}" + OUTPUT "${HALO_PB_H_FILE}" + "${HALO_PB_CPP_FILE}" + "${HALO_GRPC_PB_H_FILE}" + "${HALO_GRPC_PB_CPP_FILE}" COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} ARGS "--proto_path=${PROTO_PATH}" "--cpp_out=${GENERATED_PROTOBUF_PATH}" - "${ORCHESTRA_PROTO}" + "${HALO_PROTO}" COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} ARGS "--proto_path=${PROTO_PATH}" "--grpc_out=${GENERATED_PROTOBUF_PATH}" "--plugin=protoc-gen-grpc=/usr/local/bin/grpc_cpp_plugin" - "${ORCHESTRA_PROTO}" + "${HALO_PROTO}" ) add_custom_command( @@ -67,8 +67,8 @@ add_custom_command( "${TYPES_PROTO}" ) -set(GENERATED_PROTOBUF_FILES ${ORCHESTRA_PB_H_FILE} ${ORCHESTRA_PB_CPP_FILE} - ${ORCHESTRA_GRPC_PB_H_FILE} ${ORCHESTRA_GRPC_PB_CPP_FILE} +set(GENERATED_PROTOBUF_FILES ${HALO_PB_H_FILE} ${HALO_PB_CPP_FILE} + ${HALO_GRPC_PB_H_FILE} ${HALO_GRPC_PB_CPP_FILE} ${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE} ${TYPES_GRPC_PB_H_FILE} ${TYPES_GRPC_PB_CPP_FILE}) @@ -81,7 +81,7 @@ endif() add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) target_link_libraries(objstore arrow numbuf pynumbuf) add_executable(scheduler src/scheduler.cc src/computation_graph.cc ${GENERATED_PROTOBUF_FILES}) -add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) -target_link_libraries(orchpylib arrow numbuf pynumbuf) +add_library(halolib SHARED src/halolib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) +target_link_libraries(halolib arrow numbuf pynumbuf) -install(TARGETS objstore scheduler orchpylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/orchpy/orchpy) +install(TARGETS objstore scheduler halolib DESTINATION ${CMAKE_SOURCE_DIR}/lib/python/halo) diff --git a/README.md b/README.md index 0aea9c26e..13107d5bb 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Orchestra +# Halo -Orchestra is a distributed execution framework with a Python-like programming model. +Halo is a distributed execution framework with a Python-like programming model. ## Design Decisions @@ -14,6 +14,6 @@ For a description of our design decisions, see 1. sudo apt-get update 2. sudo apt-get install git -3. git clone https://github.com/amplab/photon.git -4. cd photon +3. git clone https://github.com/amplab/halo.git +4. cd halo 5. bash setup.sh diff --git a/doc/aliasing.md b/doc/aliasing.md index 0fa507e26..4903f0e0f 100644 --- a/doc/aliasing.md +++ b/doc/aliasing.md @@ -1,6 +1,6 @@ # Aliasing -An important feature of Photon is that a remote call sent to the scheduler +An important feature of Halo is that a remote call sent to the scheduler immediately returns object references to the outputs of the task, and the actual outputs of the task are only associated with the relevant object references after the task has been executed and the outputs have been computed. This allows @@ -10,15 +10,15 @@ However, to provide a more flexible API, we allow tasks to not only return values, but to also return object references to values. As an examples, consider the following code. ```python -@op.distributed([], [np.ndarray]) +@halo.distributed([], [np.ndarray]) def f() return np.zeros(5) -@op.distributed([], [np.ndarray]) +@halo.distributed([], [np.ndarray]) def g() return f() -@op.distributed([], [np.ndarray]) +@halo.distributed([], [np.ndarray]) def h() return g() ``` diff --git a/doc/reference-counting.md b/doc/reference-counting.md index c9e7cbe8d..1d2a43c23 100644 --- a/doc/reference-counting.md +++ b/doc/reference-counting.md @@ -1,6 +1,6 @@ # Reference Counting -In Photon, each object is assigned a globally unique object reference by the +In Halo, each object is assigned a globally unique object reference by the scheduler (starting with 0 and incrementing upward). The objects are stored in object stores. In order to avoid running out of memory, the object stores must know when it is ok to deallocate an object. Since a worker on one node may have @@ -11,7 +11,7 @@ information. ## Reference Counting Two approaches to reclaiming memory are garbage collection and reference -counting. We choose to use a reference counting approach in Photon. There are a +counting. We choose to use a reference counting approach in Halo. There are a couple of reasons for this. Reference counting allows us to reclaim memory as early as possible. It also avoids pausing the system for garbage collection. We also note that implementing reference counting at the cluster level plays nicely @@ -77,13 +77,13 @@ because they must be passed into `AliasObjRefs` at some point). The following problem has not yet been resolved. In the following code, the result `x` will be garbage. ```python -x = orchpy.pull(single.zeros([10, 10], "float")) +x = halo.pull(single.zeros([10, 10], "float")) ``` When `single.zeros` is called, a worker will create an array of zeros and store it in an object store. An object reference to the output is returned. The call -to `orchpy.pull` will not copy data from the object store process to the worker +to `halo.pull` will not copy data from the object store process to the worker process, but will instead give the worker process a pointer to shared memory. -After the `orchpy.pull` call completes, the object reference returned by +After the `halo.pull` call completes, the object reference returned by `single.zeros` will go out of scope, and the object it refers to will be deallocated from the object store. This will cause the memory that `x` points to to be garbage. diff --git a/doc/scheduler.md b/doc/scheduler.md index 57f506826..17a8ec12a 100644 --- a/doc/scheduler.md +++ b/doc/scheduler.md @@ -1,6 +1,6 @@ # Scheduler -The scheduling strategies currently implemented in Photon are fairly basic and +The scheduling strategies currently implemented in Halo are fairly basic and all use a central scheduler. * The naive scheduler assigns tasks to workers just taking into account diff --git a/include/orchestra/orchestra.h b/include/halo/halo.h similarity index 77% rename from include/orchestra/orchestra.h rename to include/halo/halo.h index f0fbc8b98..3fbded72d 100644 --- a/include/orchestra/orchestra.h +++ b/include/halo/halo.h @@ -1,5 +1,5 @@ -#ifndef ORCHESTRA_INCLUDE_ORCHESTRA_H -#define ORCHESTRA_INCLUDE_ORCHESTRA_H +#ifndef HALO_INCLUDE_HALO_H +#define HALO_INCLUDE_HALO_H #include #include @@ -34,20 +34,20 @@ public: typedef std::vector > ObjTable; typedef std::unordered_map FnTable; -#define ORCH_VERBOSE -1 -#define ORCH_INFO 0 -#define ORCH_DEBUG 1 -#define ORCH_FATAL 2 -#define ORCH_REFCOUNT ORCH_VERBOSE -#define ORCH_ALIAS ORCH_VERBOSE +#define HALO_VERBOSE -1 +#define HALO_INFO 0 +#define HALO_DEBUG 1 +#define HALO_FATAL 2 +#define HALO_REFCOUNT HALO_VERBOSE +#define HALO_ALIAS HALO_VERBOSE -#define ORCH_LOG(LEVEL, MESSAGE) \ - if (LEVEL == ORCH_VERBOSE) { \ +#define HALO_LOG(LEVEL, MESSAGE) \ + if (LEVEL == HALO_VERBOSE) { \ \ - } else if (LEVEL == ORCH_FATAL) { \ + } else if (LEVEL == HALO_FATAL) { \ std::cerr << "fatal error occured: " << MESSAGE << std::endl; \ std::exit(1); \ - } else if (LEVEL == ORCH_DEBUG) { \ + } else if (LEVEL == HALO_DEBUG) { \ \ } else { \ std::cout << MESSAGE << std::endl; \ diff --git a/lib/orchpy/arrays/__init__.py b/lib/python/arrays/__init__.py similarity index 100% rename from lib/orchpy/arrays/__init__.py rename to lib/python/arrays/__init__.py diff --git a/lib/orchpy/arrays/dist/__init__.py b/lib/python/arrays/dist/__init__.py similarity index 100% rename from lib/orchpy/arrays/dist/__init__.py rename to lib/python/arrays/dist/__init__.py diff --git a/lib/orchpy/arrays/dist/core.py b/lib/python/arrays/dist/core.py similarity index 90% rename from lib/orchpy/arrays/dist/core.py rename to lib/python/arrays/dist/core.py index 37b54d41f..0aecca332 100644 --- a/lib/orchpy/arrays/dist/core.py +++ b/lib/python/arrays/dist/core.py @@ -1,7 +1,7 @@ from typing import List import numpy as np import arrays.single as single -import orchpy as op +import halo __all__ = ["BLOCK_SIZE", "DistArray", "assemble", "zeros", "ones", "copy", "eye", "triu", "tril", "blockwise_dot", "dot", "transpose", "add", "subtract", "numpy_to_dist", "subblocks"] @@ -55,13 +55,13 @@ class DistArray(object): def assemble(self): """Assemble an array on this node from a distributed array object reference.""" - first_block = op.pull(self.objrefs[(0,) * self.ndim]) + first_block = halo.pull(self.objrefs[(0,) * self.ndim]) dtype = first_block.dtype result = np.zeros(self.shape, dtype=dtype) for index in np.ndindex(*self.num_blocks): lower = DistArray.compute_block_lower(index, self.shape) upper = DistArray.compute_block_upper(index, self.shape) - result[[slice(l, u) for (l, u) in zip(lower, upper)]] = op.pull(self.objrefs[index]) + result[[slice(l, u) for (l, u) in zip(lower, upper)]] = halo.pull(self.objrefs[index]) return result def __getitem__(self, sliced): @@ -69,42 +69,42 @@ class DistArray(object): a = self.assemble() return a[sliced] -@op.distributed([DistArray], [np.ndarray]) +@halo.distributed([DistArray], [np.ndarray]) def assemble(a): return a.assemble() # TODO(rkn): what should we call this method -@op.distributed([np.ndarray], [DistArray]) +@halo.distributed([np.ndarray], [DistArray]) def numpy_to_dist(a): result = DistArray(a.shape) for index in np.ndindex(*result.num_blocks): lower = DistArray.compute_block_lower(index, a.shape) upper = DistArray.compute_block_upper(index, a.shape) - result.objrefs[index] = op.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) + result.objrefs[index] = halo.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) return result -@op.distributed([List[int], str], [DistArray]) +@halo.distributed([List[int], str], [DistArray]) def zeros(shape, dtype_name="float"): result = DistArray(shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = single.zeros(DistArray.compute_block_shape(index, shape), dtype_name=dtype_name) return result -@op.distributed([List[int], str], [DistArray]) +@halo.distributed([List[int], str], [DistArray]) def ones(shape, dtype_name="float"): result = DistArray(shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = single.ones(DistArray.compute_block_shape(index, shape), dtype_name=dtype_name) return result -@op.distributed([DistArray], [DistArray]) +@halo.distributed([DistArray], [DistArray]) def copy(a): result = DistArray(a.shape) for index in np.ndindex(*result.num_blocks): result.objrefs[index] = a.objrefs[index] # We don't need to actually copy the objects because cluster-level objects are assumed to be immutable. return result -@op.distributed([int, int, str], [DistArray]) +@halo.distributed([int, int, str], [DistArray]) def eye(dim1, dim2=-1, dtype_name="float"): dim2 = dim1 if dim2 == -1 else dim2 shape = [dim1, dim2] @@ -117,7 +117,7 @@ def eye(dim1, dim2=-1, dtype_name="float"): result.objrefs[i, j] = single.zeros(block_shape, dtype_name=dtype_name) return result -@op.distributed([DistArray], [DistArray]) +@halo.distributed([DistArray], [DistArray]) def triu(a): if a.ndim != 2: raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim)) @@ -131,7 +131,7 @@ def triu(a): result.objrefs[i, j] = single.zeros_like(a.objrefs[i, j]) return result -@op.distributed([DistArray], [DistArray]) +@halo.distributed([DistArray], [DistArray]) def tril(a): if a.ndim != 2: raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim)) @@ -145,7 +145,7 @@ def tril(a): result.objrefs[i, j] = single.zeros_like(a.objrefs[i, j]) return result -@op.distributed([np.ndarray, None], [np.ndarray]) +@halo.distributed([np.ndarray, None], [np.ndarray]) def blockwise_dot(*matrices): n = len(matrices) if n % 2 != 0: @@ -156,7 +156,7 @@ def blockwise_dot(*matrices): result += np.dot(matrices[i], matrices[n / 2 + i]) return result -@op.distributed([DistArray, DistArray], [DistArray]) +@halo.distributed([DistArray, DistArray], [DistArray]) def dot(a, b): if a.ndim != 2: raise Exception("dot expects its arguments to be 2-dimensional, but a.ndim = {}.".format(a.ndim)) @@ -172,7 +172,7 @@ def dot(a, b): return result # This is not in numpy, should we expose this? -@op.distributed([DistArray, List[int], None], [DistArray]) +@halo.distributed([DistArray, List[int], None], [DistArray]) def subblocks(a, *ranges): """ This function produces a distributed array from a subset of the blocks in the `a`. The result and `a` will have the same number of dimensions.For example, @@ -203,7 +203,7 @@ def subblocks(a, *ranges): result.objrefs[index] = a.objrefs[tuple([ranges[i][index[i]] for i in range(a.ndim)])] return result -@op.distributed([DistArray], [DistArray]) +@halo.distributed([DistArray], [DistArray]) def transpose(a): if a.ndim != 2: raise Exception("transpose expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape)) @@ -214,7 +214,7 @@ def transpose(a): return result # TODO(rkn): support broadcasting? -@op.distributed([DistArray, DistArray], [DistArray]) +@halo.distributed([DistArray, DistArray], [DistArray]) def add(x1, x2): if x1.shape != x2.shape: raise Exception("add expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape)) @@ -224,7 +224,7 @@ def add(x1, x2): return result # TODO(rkn): support broadcasting? -@op.distributed([DistArray, DistArray], [DistArray]) +@halo.distributed([DistArray, DistArray], [DistArray]) def subtract(x1, x2): if x1.shape != x2.shape: raise Exception("subtract expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape)) diff --git a/lib/orchpy/arrays/dist/linalg.py b/lib/python/arrays/dist/linalg.py similarity index 82% rename from lib/orchpy/arrays/dist/linalg.py rename to lib/python/arrays/dist/linalg.py index 9e58d3c62..bfc9e59ab 100644 --- a/lib/orchpy/arrays/dist/linalg.py +++ b/lib/python/arrays/dist/linalg.py @@ -2,13 +2,13 @@ from typing import List import numpy as np import arrays.single as single -import orchpy as op +import halo from core import * __all__ = ["tsqr", "modified_lu", "tsqr_hr", "qr"] -@op.distributed([DistArray], [DistArray, np.ndarray]) +@halo.distributed([DistArray], [DistArray, np.ndarray]) def tsqr(a): """ arguments: @@ -17,10 +17,10 @@ def tsqr(a): a.shape == (M, N) K == min(M, N) return values: - q: DistArray, if q_full = op.context.pull(DistArray, q).assemble(), then + q: DistArray, if q_full = halo.context.pull(DistArray, q).assemble(), then q_full.shape == (M, K) np.allclose(np.dot(q_full.T, q_full), np.eye(K)) == True - r: np.ndarray, if r_val = op.context.pull(np.ndarray, r), then + r: np.ndarray, if r_val = halo.context.pull(np.ndarray, r), then r_val.shape == (K, N) np.allclose(r, np.triu(r)) == True """ @@ -80,7 +80,7 @@ def tsqr(a): return q_result, r # TODO(rkn): This is unoptimized, we really want a block version of this. -@op.distributed([DistArray], [DistArray, np.ndarray, np.ndarray]) +@halo.distributed([DistArray], [DistArray, np.ndarray, np.ndarray]) def modified_lu(q): """ Algorithm 5 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf @@ -108,39 +108,39 @@ def modified_lu(q): for i in range(b): L[i, i] = 1 U = np.triu(q_work)[:b, :] - return numpy_to_dist(op.push(L)), U, S # TODO(rkn): get rid of push and pull + return numpy_to_dist(halo.push(L)), U, S # TODO(rkn): get rid of push and pull -@op.distributed([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray]) def tsqr_hr_helper1(u, s, y_top_block, b): y_top = y_top_block[:b, :b] s_full = np.diag(s) t = -1 * np.dot(u, np.dot(s_full, np.linalg.inv(y_top).T)) return t, y_top -@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray], [np.ndarray]) def tsqr_hr_helper2(s, r_temp): s_full = np.diag(s) return np.dot(s_full, r_temp) -@op.distributed([DistArray], [DistArray, np.ndarray, np.ndarray, np.ndarray]) +@halo.distributed([DistArray], [DistArray, np.ndarray, np.ndarray, np.ndarray]) def tsqr_hr(a): """Algorithm 6 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" q, r_temp = tsqr(a) y, u, s = modified_lu(q) - y_blocked = op.pull(y) + y_blocked = halo.pull(y) t, y_top = tsqr_hr_helper1(u, s, y_blocked.objrefs[0, 0], a.shape[1]) r = tsqr_hr_helper2(s, r_temp) return y, t, y_top, r -@op.distributed([np.ndarray, np.ndarray, np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray, np.ndarray, np.ndarray], [np.ndarray]) def qr_helper1(a_rc, y_ri, t, W_c): return a_rc - np.dot(y_ri, np.dot(t.T, W_c)) -@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray], [np.ndarray]) def qr_helper2(y_ri, a_rc): return np.dot(y_ri.T, a_rc) -@op.distributed([DistArray], [DistArray, DistArray]) +@halo.distributed([DistArray], [DistArray, DistArray]) def qr(a): """Algorithm 7 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" m, n = a.shape[0], a.shape[1] @@ -150,21 +150,21 @@ def qr(a): a_work = DistArray() a_work.construct(a.shape, np.copy(a.objrefs)) - result_dtype = np.linalg.qr(op.pull(a.objrefs[0, 0]))[0].dtype.name - r_res = op.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. - y_res = op.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + result_dtype = np.linalg.qr(halo.pull(a.objrefs[0, 0]))[0].dtype.name + r_res = halo.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + y_res = halo.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. Ts = [] for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0] sub_dist_array = subblocks(a_work, range(i, a_work.num_blocks[0]), [i]) y, t, _, R = tsqr_hr(sub_dist_array) - y_val = op.pull(y) + y_val = halo.pull(y) for j in range(i, a.num_blocks[0]): y_res.objrefs[j, i] = y_val.objrefs[j - i, 0] if a.shape[0] > a.shape[1]: # in this case, R needs to be square - R_shape = op.pull(single.shape(R)) + R_shape = halo.pull(single.shape(R)) eye_temp = single.eye(R_shape[1], R_shape[0], dtype_name=result_dtype) r_res.objrefs[i, i] = single.dot(eye_temp, R) else: diff --git a/lib/orchpy/arrays/dist/random.py b/lib/python/arrays/dist/random.py similarity index 86% rename from lib/orchpy/arrays/dist/random.py rename to lib/python/arrays/dist/random.py index 9e0464884..ed8b38d39 100644 --- a/lib/orchpy/arrays/dist/random.py +++ b/lib/python/arrays/dist/random.py @@ -2,11 +2,11 @@ from typing import List import numpy as np import arrays.single as single -import orchpy as op +import halo from core import * -@op.distributed([List[int]], [DistArray]) +@halo.distributed([List[int]], [DistArray]) def normal(shape): num_blocks = DistArray.compute_num_blocks(shape) objrefs = np.empty(num_blocks, dtype=object) diff --git a/lib/orchpy/arrays/single/__init__.py b/lib/python/arrays/single/__init__.py similarity index 100% rename from lib/orchpy/arrays/single/__init__.py rename to lib/python/arrays/single/__init__.py diff --git a/lib/orchpy/arrays/single/core.py b/lib/python/arrays/single/core.py similarity index 63% rename from lib/orchpy/arrays/single/core.py rename to lib/python/arrays/single/core.py index 227e5fcfd..d2487d46f 100644 --- a/lib/orchpy/arrays/single/core.py +++ b/lib/python/arrays/single/core.py @@ -1,80 +1,80 @@ from typing import List import numpy as np -import orchpy as op +import halo __all__ = ["zeros", "zeros_like", "ones", "eye", "dot", "vstack", "hstack", "subarray", "copy", "tril", "triu", "diag", "transpose", "add", "subtract", "sum", "shape"] -@op.distributed([List[int], str, str], [np.ndarray]) +@halo.distributed([List[int], str, str], [np.ndarray]) def zeros(shape, dtype_name="float", order="C"): return np.zeros(shape, dtype=np.dtype(dtype_name), order=order) -@op.distributed([np.ndarray, str, str, bool], [np.ndarray]) +@halo.distributed([np.ndarray, str, str, bool], [np.ndarray]) def zeros_like(a, dtype_name="None", order="K", subok=True): dtype_val = None if dtype_name == "None" else np.dtype(dtype_name) return np.zeros_like(a, dtype=dtype_val, order=order, subok=subok) -@op.distributed([List[int], str, str], [np.ndarray]) +@halo.distributed([List[int], str, str], [np.ndarray]) def ones(shape, dtype_name="float", order="C"): return np.ones(shape, dtype=np.dtype(dtype_name), order=order) -@op.distributed([int, int, int, str], [np.ndarray]) +@halo.distributed([int, int, int, str], [np.ndarray]) def eye(N, M=-1, k=0, dtype_name="float"): M = N if M == -1 else M return np.eye(N, M=M, k=k, dtype=np.dtype(dtype_name)) -@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray], [np.ndarray]) def dot(a, b): return np.dot(a, b) # TODO(rkn): My preferred signature would have been -# @op.distributed([List[np.ndarray]], [np.ndarray]) but that currently doesn't +# @halo.distributed([List[np.ndarray]], [np.ndarray]) but that currently doesn't # work because that would expect a list of ndarrays not a list of ObjRefs -@op.distributed([np.ndarray, None], [np.ndarray]) +@halo.distributed([np.ndarray, None], [np.ndarray]) def vstack(*xs): return np.vstack(xs) -@op.distributed([np.ndarray, None], [np.ndarray]) +@halo.distributed([np.ndarray, None], [np.ndarray]) def hstack(*xs): return np.hstack(xs) # TODO(rkn): this doesn't parallel the numpy API, but we can't really slice an ObjRef, think about this -@op.distributed([np.ndarray, List[int], List[int]], [np.ndarray]) +@halo.distributed([np.ndarray, List[int], List[int]], [np.ndarray]) def subarray(a, lower_indices, upper_indices): # TODO(rkn): be consistent about using "index" versus "indices" return a[[slice(l, u) for (l, u) in zip(lower_indices, upper_indices)]] -@op.distributed([np.ndarray, str], [np.ndarray]) +@halo.distributed([np.ndarray, str], [np.ndarray]) def copy(a, order="K"): return np.copy(a, order=order) -@op.distributed([np.ndarray, int], [np.ndarray]) +@halo.distributed([np.ndarray, int], [np.ndarray]) def tril(m, k=0): return np.tril(m, k=k) -@op.distributed([np.ndarray, int], [np.ndarray]) +@halo.distributed([np.ndarray, int], [np.ndarray]) def triu(m, k=0): return np.triu(m, k=k) -@op.distributed([np.ndarray, int], [np.ndarray]) +@halo.distributed([np.ndarray, int], [np.ndarray]) def diag(v, k=0): return np.diag(v, k=k) -@op.distributed([np.ndarray, List[int]], [np.ndarray]) +@halo.distributed([np.ndarray, List[int]], [np.ndarray]) def transpose(a, axes=[]): axes = None if axes == [] else axes return np.transpose(a, axes=axes) -@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray], [np.ndarray]) def add(x1, x2): return np.add(x1, x2) -@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray], [np.ndarray]) def subtract(x1, x2): return np.subtract(x1, x2) -@op.distributed([int, np.ndarray, None], [np.ndarray]) +@halo.distributed([int, np.ndarray, None], [np.ndarray]) def sum(axis, *xs): return np.sum(xs, axis=axis) -@op.distributed([np.ndarray], [tuple]) +@halo.distributed([np.ndarray], [tuple]) def shape(a): return np.shape(a) diff --git a/lib/orchpy/arrays/single/linalg.py b/lib/python/arrays/single/linalg.py similarity index 53% rename from lib/orchpy/arrays/single/linalg.py rename to lib/python/arrays/single/linalg.py index 5fbd7b4a1..68947a6be 100644 --- a/lib/orchpy/arrays/single/linalg.py +++ b/lib/python/arrays/single/linalg.py @@ -1,88 +1,88 @@ from typing import List import numpy as np -import orchpy as op +import halo __all__ = ["matrix_power", "solve", "tensorsolve", "tensorinv", "inv", "cholesky", "eigvals", "eigvalsh", "pinv", "slogdet", "det", "svd", "eig", "eigh", "lstsq", "norm", "qr", "cond", "matrix_rank", "LinAlgError", "multi_dot"] -@op.distributed([np.ndarray, int], [np.ndarray]) +@halo.distributed([np.ndarray, int], [np.ndarray]) def matrix_power(M, n): return np.linalg.matrix_power(M, n) -@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray, np.ndarray], [np.ndarray]) def solve(a, b): return np.linalg.solve(a, b) -@op.distributed([np.ndarray], [np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray]) def tensorsolve(a): raise NotImplementedError -@op.distributed([np.ndarray], [np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray]) def tensorinv(a): raise NotImplementedError -@op.distributed([np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray]) def inv(a): return np.linalg.inv(a) -@op.distributed([np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray]) def cholesky(a): return np.linalg.cholesky(a) -@op.distributed([np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray]) def eigvals(a): return np.linalg.eigvals(a) -@op.distributed([np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray]) def eigvalsh(a): raise NotImplementedError -@op.distributed([np.ndarray], [np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray]) def pinv(a): return np.linalg.pinv(a) -@op.distributed([np.ndarray], [int]) +@halo.distributed([np.ndarray], [int]) def slogdet(a): raise NotImplementedError -@op.distributed([np.ndarray], [float]) +@halo.distributed([np.ndarray], [float]) def det(a): return np.linalg.det(a) -@op.distributed([np.ndarray], [np.ndarray, np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray, np.ndarray]) def svd(a): return np.linalg.svd(a) -@op.distributed([np.ndarray], [np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray]) def eig(a): return np.linalg.eig(a) -@op.distributed([np.ndarray], [np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray]) def eigh(a): return np.linalg.eigh(a) -@op.distributed([np.ndarray], [np.ndarray, np.ndarray, int, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray, int, np.ndarray]) def lstsq(a, b): return np.linalg.lstsq(a) -@op.distributed([np.ndarray], [float]) +@halo.distributed([np.ndarray], [float]) def norm(x): return np.linalg.norm(x) -@op.distributed([np.ndarray], [np.ndarray, np.ndarray]) +@halo.distributed([np.ndarray], [np.ndarray, np.ndarray]) def qr(a): return np.linalg.qr(a) -@op.distributed([np.ndarray], [float]) +@halo.distributed([np.ndarray], [float]) def cond(x): return np.linalg.cond(x) -@op.distributed([np.ndarray], [int]) +@halo.distributed([np.ndarray], [int]) def matrix_rank(M): return np.linalg.matrix_rank(M) -@op.distributed([np.ndarray, None], [np.ndarray]) +@halo.distributed([np.ndarray, None], [np.ndarray]) def multi_dot(a): raise NotImplementedError diff --git a/lib/orchpy/arrays/single/random.py b/lib/python/arrays/single/random.py similarity index 61% rename from lib/orchpy/arrays/single/random.py rename to lib/python/arrays/single/random.py index f6b8c778c..75f434fb6 100644 --- a/lib/orchpy/arrays/single/random.py +++ b/lib/python/arrays/single/random.py @@ -1,7 +1,7 @@ from typing import List import numpy as np -import orchpy as op +import halo -@op.distributed([List[int]], [np.ndarray]) +@halo.distributed([List[int]], [np.ndarray]) def normal(shape): return np.random.normal(size=shape) diff --git a/lib/orchpy/orchpy/__init__.py b/lib/python/halo/__init__.py similarity index 81% rename from lib/orchpy/orchpy/__init__.py rename to lib/python/halo/__init__.py index a96e56c27..09acd89b6 100644 --- a/lib/orchpy/orchpy/__init__.py +++ b/lib/python/halo/__init__.py @@ -1,3 +1,3 @@ -import liborchpylib as lib +import libhalolib as lib import serialization from worker import scheduler_info, register_module, connect, disconnect, pull, push, distributed diff --git a/lib/orchpy/orchpy/serialization.py b/lib/python/halo/serialization.py similarity index 60% rename from lib/orchpy/orchpy/serialization.py rename to lib/python/halo/serialization.py index db76618f6..815b8ca7a 100644 --- a/lib/orchpy/orchpy/serialization.py +++ b/lib/python/halo/serialization.py @@ -1,6 +1,6 @@ import importlib -import orchpy +import halo def to_primitive(obj): if hasattr(obj, "serialize"): @@ -22,18 +22,18 @@ def from_primitive(primitive_obj): def serialize(worker_capsule, obj): primitive_obj = to_primitive(obj) - obj_capsule, contained_objrefs = orchpy.lib.serialize_object(worker_capsule, primitive_obj) # contained_objrefs is a list of the objrefs contained in obj + obj_capsule, contained_objrefs = halo.lib.serialize_object(worker_capsule, primitive_obj) # contained_objrefs is a list of the objrefs contained in obj return obj_capsule, contained_objrefs def deserialize(worker_capsule, capsule): - primitive_obj = orchpy.lib.deserialize_object(worker_capsule, capsule) + primitive_obj = halo.lib.deserialize_object(worker_capsule, capsule) return from_primitive(primitive_obj) def serialize_task(worker_capsule, func_name, args): - primitive_args = [(arg if isinstance(arg, orchpy.lib.ObjRef) else to_primitive(arg)) for arg in args] - return orchpy.lib.serialize_task(worker_capsule, func_name, primitive_args) + primitive_args = [(arg if isinstance(arg, halo.lib.ObjRef) else to_primitive(arg)) for arg in args] + return halo.lib.serialize_task(worker_capsule, func_name, primitive_args) def deserialize_task(worker_capsule, task): - func_name, primitive_args, return_objrefs = orchpy.lib.deserialize_task(worker_capsule, task) - args = [(arg if isinstance(arg, orchpy.lib.ObjRef) else from_primitive(arg)) for arg in primitive_args] + func_name, primitive_args, return_objrefs = halo.lib.deserialize_task(worker_capsule, task) + args = [(arg if isinstance(arg, halo.lib.ObjRef) else from_primitive(arg)) for arg in primitive_args] return func_name, args, return_objrefs diff --git a/lib/orchpy/orchpy/services.py b/lib/python/halo/services.py similarity index 91% rename from lib/orchpy/orchpy/services.py rename to lib/python/halo/services.py index bc42b1744..523225924 100644 --- a/lib/orchpy/orchpy/services.py +++ b/lib/python/halo/services.py @@ -3,8 +3,8 @@ import os import atexit import time -import orchpy -import orchpy.worker as worker +import halo +import halo.worker as worker _services_path = os.path.dirname(os.path.abspath(__file__)) @@ -58,9 +58,9 @@ def cleanup(): global drivers for driver in drivers: - orchpy.disconnect(driver) + halo.disconnect(driver) if len(drivers) == 0: - orchpy.disconnect() + halo.disconnect() drivers = [] # atexit.register(cleanup) @@ -97,7 +97,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None for _ in range(num_workers): start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.3) - orchpy.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) + halo.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port())) time.sleep(0.5) def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None): @@ -124,11 +124,11 @@ def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_ driver_workers = [] for i in range(num_objstores): driver_worker = worker.Worker() - orchpy.connect(scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()), driver_worker) + halo.connect(scheduler_address, objstore_address, address(IP_ADDRESS, new_worker_port()), driver_worker) driver_workers.append(driver_worker) drivers.append(driver_worker) time.sleep(0.5) return driver_workers else: - orchpy.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port())) + halo.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port())) time.sleep(0.5) diff --git a/lib/orchpy/orchpy/worker.py b/lib/python/halo/worker.py similarity index 88% rename from lib/orchpy/orchpy/worker.py rename to lib/python/halo/worker.py index 37e24effd..a35638a84 100644 --- a/lib/orchpy/orchpy/worker.py +++ b/lib/python/halo/worker.py @@ -4,7 +4,7 @@ import funcsigs import numpy as np import pynumbuf -import orchpy +import halo import serialization class Worker(object): @@ -17,10 +17,10 @@ class Worker(object): def put_object(self, objref, value): """Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store.""" if pynumbuf.serializable(value): - orchpy.lib.put_arrow(self.handle, objref, value) + halo.lib.put_arrow(self.handle, objref, value) else: object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule - orchpy.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) + halo.lib.put_object(self.handle, objref, object_capsule, contained_objrefs) def get_object(self, objref): """ @@ -29,32 +29,32 @@ class Worker(object): WARNING: get_object can only be called on a canonical objref. """ - if orchpy.lib.is_arrow(self.handle, objref): - return orchpy.lib.get_arrow(self.handle, objref) + if halo.lib.is_arrow(self.handle, objref): + return halo.lib.get_arrow(self.handle, objref) else: - object_capsule = orchpy.lib.get_object(self.handle, objref) + object_capsule = halo.lib.get_object(self.handle, objref) return serialization.deserialize(self.handle, object_capsule) def alias_objrefs(self, alias_objref, target_objref): """Make `alias_objref` refer to the same object that `target_objref` refers to.""" - orchpy.lib.alias_objrefs(self.handle, alias_objref, target_objref) + halo.lib.alias_objrefs(self.handle, alias_objref, target_objref) def register_function(self, function): """Notify the scheduler that this worker can execute the function with name `func_name`. Store the function `function` locally.""" - orchpy.lib.register_function(self.handle, function.func_name, len(function.return_types)) + halo.lib.register_function(self.handle, function.func_name, len(function.return_types)) self.functions[function.func_name] = function def submit_task(self, func_name, args): """Tell the scheduler to schedule the execution of the function with name `func_name` with arguments `args`. Retrieve object references for the outputs of the function from the scheduler and immediately return them.""" task_capsule = serialization.serialize_task(self.handle, func_name, args) - objrefs = orchpy.lib.submit_task(self.handle, task_capsule) + objrefs = halo.lib.submit_task(self.handle, task_capsule) return objrefs # We make `global_worker` a global variable so that there is one worker per worker process. global_worker = Worker() def scheduler_info(worker=global_worker): - return orchpy.lib.scheduler_info(worker.handle); + return halo.lib.scheduler_info(worker.handle); def register_module(module, recursive=False, worker=global_worker): print "registering functions in module {}.".format(module.__name__) @@ -69,32 +69,32 @@ def register_module(module, recursive=False, worker=global_worker): def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker): if hasattr(worker, "handle"): del worker.handle - worker.handle = orchpy.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) + worker.handle = halo.lib.create_worker(scheduler_addr, objstore_addr, worker_addr) def disconnect(worker=global_worker): - orchpy.lib.disconnect(worker.handle) + halo.lib.disconnect(worker.handle) def pull(objref, worker=global_worker): - orchpy.lib.request_object(worker.handle, objref) + halo.lib.request_object(worker.handle, objref) return worker.get_object(objref) def push(value, worker=global_worker): - objref = orchpy.lib.get_objref(worker.handle) + objref = halo.lib.get_objref(worker.handle) worker.put_object(objref, value) return objref def main_loop(worker=global_worker): - if not orchpy.lib.connected(worker.handle): + if not halo.lib.connected(worker.handle): raise Exception("Worker is attempting to enter main_loop but has not been connected yet.") - orchpy.lib.start_worker_service(worker.handle) + halo.lib.start_worker_service(worker.handle) def process_task(task): # wrapping these lines in a function should cause the local variables to go out of scope more quickly, which is useful for inspecting reference counts func_name, args, return_objrefs = serialization.deserialize_task(worker.handle, task) arguments = get_arguments_for_execution(worker.functions[func_name], args, worker) # get args from objstore outputs = worker.functions[func_name].executor(arguments) # execute the function store_outputs_in_objstore(return_objrefs, outputs, worker) # store output in local object store - orchpy.lib.notify_task_completed(worker.handle) # notify the scheduler that the task has completed + halo.lib.notify_task_completed(worker.handle) # notify the scheduler that the task has completed while True: - task = orchpy.lib.wait_for_next_task(worker.handle) + task = halo.lib.wait_for_next_task(worker.handle) process_task(task) def distributed(arg_types, return_types, worker=global_worker): @@ -132,7 +132,7 @@ def check_return_values(function, result): if len(result) != len(function.return_types): raise Exception("The @distributed decorator for function {} has {} return values with types {}, but {} returned {} values.".format(function.__name__, len(function.return_types), function.return_types, function.__name__, len(result))) for i in range(len(result)): - if (not isinstance(result[i], function.return_types[i])) and (not isinstance(result[i], orchpy.lib.ObjRef)): + if (not isinstance(result[i], function.return_types[i])) and (not isinstance(result[i], halo.lib.ObjRef)): raise Exception("The {}th return value for function {} has type {}, but the @distributed decorator expected a return value of type {} or an ObjRef.".format(i, function.__name__, type(result[i]), function.return_types[i])) # helper method, this should not be called by the user @@ -153,7 +153,7 @@ def check_arguments(function, args): else: assert False, "This code should be unreachable." - if isinstance(arg, orchpy.lib.ObjRef): + if isinstance(arg, halo.lib.ObjRef): # TODO(rkn): When we have type information in the ObjRef, do type checking here. pass else: @@ -182,7 +182,7 @@ def get_arguments_for_execution(function, args, worker=global_worker): else: assert False, "This code should be unreachable." - if isinstance(arg, orchpy.lib.ObjRef): + if isinstance(arg, halo.lib.ObjRef): # get the object from the local object store print "Getting argument {} for function {}.".format(i, function.__name__) argument = worker.get_object(arg) @@ -202,7 +202,7 @@ def store_outputs_in_objstore(objrefs, outputs, worker=global_worker): outputs = (outputs,) for i in range(len(objrefs)): - if isinstance(outputs[i], orchpy.lib.ObjRef): + if isinstance(outputs[i], halo.lib.ObjRef): # An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to print "Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val) worker.alias_objrefs(objrefs[i], outputs[i]) diff --git a/lib/orchpy/setup.py b/lib/python/setup.py similarity index 64% rename from lib/orchpy/setup.py rename to lib/python/setup.py index 55efc4312..6fdd1b8b5 100644 --- a/lib/orchpy/setup.py +++ b/lib/python/setup.py @@ -3,17 +3,17 @@ import sys from setuptools import setup, Extension, find_packages import setuptools -# because of relative paths, this must be run from inside orch/lib/orchpy/ +# because of relative paths, this must be run from inside halo/lib/python/ MACOSX = (sys.platform in ["darwin"]) setup( - name = "orchestra", + name = "halo", version = "0.1.dev0", use_2to3=True, packages=find_packages(), package_data = { - "orchpy": ["liborchpylib.dylib" if MACOSX else "liborchpylib.so", + "halo": ["libhalolib.dylib" if MACOSX else "libhalolib.so", "scheduler", "objstore"] }, diff --git a/protos/orchestra.proto b/protos/halo.proto similarity index 100% rename from protos/orchestra.proto rename to protos/halo.proto diff --git a/setup.sh b/setup.sh index b0aae4fc1..c4813638d 100644 --- a/setup.sh +++ b/setup.sh @@ -12,5 +12,5 @@ mkdir -p ../../../build cd ../../../build cmake .. sudo make install -cd ../lib/orchpy +cd ../lib/python sudo python setup.py install diff --git a/src/computation_graph.cc b/src/computation_graph.cc index bd309dec5..cbca5918c 100644 --- a/src/computation_graph.cc +++ b/src/computation_graph.cc @@ -4,7 +4,7 @@ OperationId ComputationGraph::add_operation(std::unique_ptr operation OperationId operationid = operations_.size(); OperationId creator_operationid = operation->creator_operationid(); if (spawned_operations_.size() != operationid) { - ORCH_LOG(ORCH_FATAL, "ComputationGraph is attempting to call add_operation, but spawned_operations_.size() != operationid."); + HALO_LOG(HALO_FATAL, "ComputationGraph is attempting to call add_operation, but spawned_operations_.size() != operationid."); } operations_.emplace_back(std::move(operation)); if (creator_operationid != NO_OPERATION && creator_operationid != ROOT_OPERATION) { @@ -16,10 +16,10 @@ OperationId ComputationGraph::add_operation(std::unique_ptr operation const Task& ComputationGraph::get_task(OperationId operationid) { if (operationid >= operations_.size()) { - ORCH_LOG(ORCH_FATAL, "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); + HALO_LOG(HALO_FATAL, "ComputationGraph attempting to get_task with operationid " << operationid << ", but operationid >= operations_.size()."); } if (!operations_[operationid]->has_task()) { - ORCH_LOG(ORCH_FATAL, "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); + HALO_LOG(HALO_FATAL, "Calling get_task with operationid " << operationid << ", but this corresponds to a push not a task."); } return operations_[operationid]->task(); } diff --git a/src/computation_graph.h b/src/computation_graph.h index 55a4923a7..494c94f2c 100644 --- a/src/computation_graph.h +++ b/src/computation_graph.h @@ -1,11 +1,11 @@ -#ifndef ORCHESTRA_COMPUTATIONGRAPH_H -#define ORCHESTRA_COMPUTATIONGRAPH_H +#ifndef HALO_COMPUTATIONGRAPH_H +#define HALO_COMPUTATIONGRAPH_H #include #include -#include "orchestra/orchestra.h" -#include "orchestra.grpc.pb.h" +#include "halo/halo.h" +#include "halo.grpc.pb.h" #include "types.pb.h" // used to represent the root operation (that is, the driver code) diff --git a/src/orchpylib.cc b/src/halolib.cc similarity index 95% rename from src/orchpylib.cc rename to src/halolib.cc index ffc773264..c48e36c0f 100644 --- a/src/orchpylib.cc +++ b/src/halolib.cc @@ -45,7 +45,7 @@ static int PyObjRef_init(PyObjRef *self, PyObject *args, PyObject *kwds) { } std::vector objrefs; objrefs.push_back(self->val); - ORCH_LOG(ORCH_REFCOUNT, "In PyObjRef_init, calling increment_reference_count for objref " << objrefs[0]); + HALO_LOG(HALO_REFCOUNT, "In PyObjRef_init, calling increment_reference_count for objref " << objrefs[0]); self->worker->increment_reference_count(objrefs); return 0; }; @@ -70,7 +70,7 @@ static PyMemberDef PyObjRef_members[] = { static PyTypeObject PyObjRefType = { PyObject_HEAD_INIT(NULL) 0, /* ob_size */ - "orchpy.ObjRef", /* tp_name */ + "halo.ObjRef", /* tp_name */ sizeof(PyObjRef), /* tp_basicsize */ 0, /* tp_itemsize */ (destructor)PyObjRef_dealloc, /* tp_dealloc */ @@ -89,7 +89,7 @@ static PyTypeObject PyObjRefType = { 0, /* tp_setattro */ 0, /* tp_as_buffer */ Py_TPFLAGS_DEFAULT, /* tp_flags */ - "OrchPy objects", /* tp_doc */ + "Halo objects", /* tp_doc */ 0, /* tp_traverse */ 0, /* tp_clear */ 0, /* tp_richcompare */ @@ -119,7 +119,7 @@ PyObject* make_pyobjref(PyObject* worker_capsule, ObjRef objref) { // Error handling -static PyObject *OrchPyError; +static PyObject *HaloError; // Pass arguments from Python to C++ @@ -312,12 +312,12 @@ int serialize(PyObject* worker_capsule, PyObject* val, Obj* obj, std::vector 0) { @@ -423,7 +423,7 @@ PyObject* deserialize(PyObject* worker_capsule, const Obj& obj, std::vector 0) { @@ -434,12 +434,12 @@ PyObject* deserialize(PyObject* worker_capsule, const Obj& obj, std::vector 0) { - ORCH_LOG(ORCH_REFCOUNT, "In serialize_task, calling increment_reference_count for contained objrefs"); + HALO_LOG(HALO_REFCOUNT, "In serialize_task, calling increment_reference_count for contained objrefs"); worker->increment_reference_count(objrefs); } return PyCapsule_New(static_cast(task), "task", &TaskCapsule_Destructor); @@ -584,7 +584,7 @@ PyObject* deserialize_task(PyObject* self, PyObject* args) { return t; } -// Orchestra Python API +// Halo Python API PyObject* create_worker(PyObject* self, PyObject* args) { const char* scheduler_addr; @@ -692,7 +692,7 @@ PyObject* put_object(PyObject* self, PyObject* args) { return NULL; } if (!PyList_Check(contained_objrefs)) { - ORCH_LOG(ORCH_FATAL, "The contained_objrefs argument must be a list.") + HALO_LOG(HALO_FATAL, "The contained_objrefs argument must be a list.") } std::vector vec_contained_objrefs; size_t size = PyList_Size(contained_objrefs); @@ -773,7 +773,7 @@ PyObject* scheduler_info(PyObject* self, PyObject* args) { return dict; } -static PyMethodDef OrchPyLibMethods[] = { +static PyMethodDef HaloLibMethods[] = { { "serialize_object", serialize_object, METH_VARARGS, "serialize an object to protocol buffers" }, { "deserialize_object", deserialize_object, METH_VARARGS, "deserialize an object from protocol buffers" }, { "put_arrow", put_arrow, METH_VARARGS, "put an arrow array on the local object store"}, @@ -798,18 +798,18 @@ static PyMethodDef OrchPyLibMethods[] = { { NULL, NULL, 0, NULL } }; -PyMODINIT_FUNC initliborchpylib(void) { +PyMODINIT_FUNC initlibhalolib(void) { PyObject* m; PyObjRefType.tp_new = PyType_GenericNew; if (PyType_Ready(&PyObjRefType) < 0) { return; } - m = Py_InitModule3("liborchpylib", OrchPyLibMethods, "Python C Extension for Orchestra"); + m = Py_InitModule3("libhalolib", HaloLibMethods, "Python C Extension for Halo"); Py_INCREF(&PyObjRefType); PyModule_AddObject(m, "ObjRef", (PyObject *)&PyObjRefType); - OrchPyError = PyErr_NewException("orchpy.error", NULL, NULL); - Py_INCREF(OrchPyError); - PyModule_AddObject(m, "error", OrchPyError); + HaloError = PyErr_NewException("halo.error", NULL, NULL); + Py_INCREF(HaloError); + PyModule_AddObject(m, "error", HaloError); import_array(); } diff --git a/src/ipc.cc b/src/ipc.cc index 0a7a2bb2d..5ff3de2e4 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -31,9 +31,9 @@ MemorySegmentPool::MemorySegmentPool(ObjStoreId objstoreid, bool create) : objst // creates a memory segment if it is not already there; if the pool is in create mode, // space is allocated, if it is in open mode, the shared memory is mapped into the process void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { - ORCH_LOG(ORCH_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_); + HALO_LOG(HALO_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_); if (segmentid != segments_.size() && create_mode_) { - ORCH_LOG(ORCH_FATAL, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); + HALO_LOG(HALO_FATAL, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size()); } if (segmentid >= segments_.size()) { // resize and initialize segments_ int current_size = segments_.size(); @@ -47,7 +47,7 @@ void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { return; } if (segments_[segmentid].second == SegmentStatusType::CLOSED) { - ORCH_LOG(ORCH_FATAL, "Attempting to open segmentid " << segmentid << ", but segments_[segmentid].second == SegmentStatusType::CLOSED."); + HALO_LOG(HALO_FATAL, "Attempting to open segmentid " << segmentid << ", but segments_[segmentid].second == SegmentStatusType::CLOSED."); } std::string segment_name = get_segment_name(segmentid); if (create_mode_) { @@ -61,7 +61,7 @@ void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { } void MemorySegmentPool::close_segment(SegmentId segmentid) { - ORCH_LOG(ORCH_DEBUG, "closing segmentid " << segmentid); + HALO_LOG(HALO_DEBUG, "closing segmentid " << segmentid); std::string segment_name = get_segment_name(segmentid); shared_memory_object::remove(segment_name.c_str()); segments_[segmentid].first.reset(); @@ -70,7 +70,7 @@ void MemorySegmentPool::close_segment(SegmentId segmentid) { ObjHandle MemorySegmentPool::allocate(size_t size) { if (!create_mode_) { // allocate is called only by the object store - ORCH_LOG(ORCH_FATAL, "Attempting to call allocate, but create_mode_ is false"); + HALO_LOG(HALO_FATAL, "Attempting to call allocate, but create_mode_ is false"); } // TODO(pcm): at the moment, this always creates a new segment, this will be changed SegmentId segmentid = segments_.size(); @@ -91,7 +91,7 @@ void MemorySegmentPool::deallocate(ObjHandle pointer) { // the process that will use the address uint8_t* MemorySegmentPool::get_address(ObjHandle pointer) { if (create_mode_ && segments_[pointer.segmentid()].second != SegmentStatusType::OPENED) { - ORCH_LOG(ORCH_FATAL, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet."); + HALO_LOG(HALO_FATAL, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet."); } if (!create_mode_) { open_segment(pointer.segmentid()); diff --git a/src/ipc.h b/src/ipc.h index 8051c9ce8..4ce4400e1 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -1,5 +1,5 @@ -#ifndef ORCHESTRA_IPC_H -#define ORCHESTRA_IPC_H +#ifndef HALO_IPC_H +#define HALO_IPC_H #include #include @@ -10,7 +10,7 @@ #include #include -#include "orchestra/orchestra.h" +#include "halo/halo.h" using namespace boost::interprocess; @@ -42,7 +42,7 @@ public: queue_ = std::unique_ptr(new message_queue(open_only, name.c_str())); } } catch(interprocess_exception &ex) { - ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what()); + HALO_LOG(HALO_FATAL, "boost::interprocess exception: " << ex.what()); } return true; }; @@ -55,7 +55,7 @@ public: try { queue_->send(object, sizeof(T), 0); } catch(interprocess_exception &ex) { - ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what()); + HALO_LOG(HALO_FATAL, "boost::interprocess exception: " << ex.what()); } return true; }; @@ -66,7 +66,7 @@ public: try { queue_->receive(object, sizeof(T), recvd_size, priority); } catch(interprocess_exception &ex) { - ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what()); + HALO_LOG(HALO_FATAL, "boost::interprocess exception: " << ex.what()); } return true; } diff --git a/src/objstore.cc b/src/objstore.cc index 83db32bde..4448edcf3 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -8,7 +8,7 @@ const size_t ObjStoreService::CHUNK_SIZE = 8 * 1024; // this method needs to be protected by a objstore_lock_ // TODO(rkn): Make sure that we do not in fact need the objstore_lock_. We want multiple deliveries to be able to happen simultaneously. void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { - ORCH_LOG(ORCH_DEBUG, "Objstore " << objstoreid_ << " is beginning to pull objref " << objref); + HALO_LOG(HALO_DEBUG, "Objstore " << objstoreid_ << " is beginning to pull objref " << objref); ObjChunk chunk; ClientContext context; StreamObjToRequest stream_request; @@ -27,7 +27,7 @@ void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { segmentpool_lock_.unlock(); do { if (num_bytes + chunk.data().size() > total_size) { - ORCH_LOG(ORCH_FATAL, "The reader attempted to stream too many bytes."); + HALO_LOG(HALO_FATAL, "The reader attempted to stream too many bytes."); } std::memcpy(data, chunk.data().c_str(), chunk.data().size()); data += chunk.data().size(); @@ -37,10 +37,10 @@ void ObjStoreService::pull_data_from(ObjRef objref, ObjStore::Stub& stub) { // finalize object if (num_bytes != total_size) { - ORCH_LOG(ORCH_FATAL, "Streamed objref " << objref << ", but num_bytes != total_size"); + HALO_LOG(HALO_FATAL, "Streamed objref " << objref << ", but num_bytes != total_size"); } object_ready(objref, chunk.metadata_offset()); - ORCH_LOG(ORCH_DEBUG, "finished streaming data, objref was " << objref << " and size was " << num_bytes); + HALO_LOG(HALO_DEBUG, "finished streaming data, objref was " << objref << " and size was " << num_bytes); } ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel) @@ -79,10 +79,10 @@ Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliver if (memory_[objref].second == MemoryStatusType::NOT_PRESENT) { } else if (memory_[objref].second == MemoryStatusType::DEALLOCATED) { - ORCH_LOG(ORCH_FATAL, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); + HALO_LOG(HALO_FATAL, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED."); } else { - ORCH_LOG(ORCH_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); + HALO_LOG(HALO_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again."); return Status::OK; } memory_[objref].second = MemoryStatusType::PRE_ALLOCED; @@ -115,15 +115,15 @@ Status ObjStoreService::ObjStoreInfo(ServerContext* context, const ObjStoreInfoR } Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToRequest* request, ServerWriter* writer) { - ORCH_LOG(ORCH_DEBUG, "begin to stream data from object store " << objstoreid_); + HALO_LOG(HALO_DEBUG, "begin to stream data from object store " << objstoreid_); ObjChunk chunk; ObjRef objref = request->objref(); memory_lock_.lock(); if (objref >= memory_.size()) { - ORCH_LOG(ORCH_FATAL, "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); + HALO_LOG(HALO_FATAL, "Objstore " << objstoreid_ << " is attempting to use objref " << objref << " in StreamObjTo, but this objref is not present in the object store."); } if (memory_[objref].second != MemoryStatusType::READY) { - ORCH_LOG(ORCH_FATAL, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); + HALO_LOG(HALO_FATAL, "Objstore " << objstoreid_ << " is attempting to stream objref " << objref << ", but memory_[objref].second != MemoryStatusType::READY."); } ObjHandle handle = memory_[objref].first; memory_lock_.unlock(); // TODO(rkn): Make sure we don't still need to hold on to this lock. @@ -136,7 +136,7 @@ Status ObjStoreService::StreamObjTo(ServerContext* context, const StreamObjToReq chunk.set_total_size(size); chunk.set_data(head + i, std::min(CHUNK_SIZE, size - i)); if (!writer->Write(chunk)) { - ORCH_LOG(ORCH_FATAL, "stream connection prematurely closed") + HALO_LOG(HALO_FATAL, "stream connection prematurely closed") } } return Status::OK; @@ -146,20 +146,20 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq // NotifyAlias assumes that the objstore already holds canonical_objref ObjRef alias_objref = request->alias_objref(); ObjRef canonical_objref = request->canonical_objref(); - ORCH_LOG(ORCH_DEBUG, "Aliasing objref " << alias_objref << " with objref " << canonical_objref); + HALO_LOG(HALO_DEBUG, "Aliasing objref " << alias_objref << " with objref " << canonical_objref); { std::lock_guard memory_lock(memory_lock_); if (canonical_objref >= memory_.size()) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") + HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") } if (memory_[canonical_objref].second == MemoryStatusType::NOT_READY) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") + HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") } if (memory_[canonical_objref].second == MemoryStatusType::NOT_PRESENT) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") + HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not present in the objstore.") } if (memory_[canonical_objref].second == MemoryStatusType::DEALLOCATED) { - ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") + HALO_LOG(HALO_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " has already been deallocated.") } if (alias_objref >= memory_.size()) { memory_.resize(alias_objref + 1, std::make_pair(ObjHandle(), MemoryStatusType::NOT_PRESENT)); @@ -176,13 +176,13 @@ Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasReq Status ObjStoreService::DeallocateObject(ServerContext* context, const DeallocateObjectRequest* request, AckReply* reply) { ObjRef canonical_objref = request->canonical_objref(); - ORCH_LOG(ORCH_REFCOUNT, "Deallocating canonical_objref " << canonical_objref); + HALO_LOG(HALO_REFCOUNT, "Deallocating canonical_objref " << canonical_objref); std::lock_guard memory_lock(memory_lock_); if (memory_[canonical_objref].second != MemoryStatusType::READY) { - ORCH_LOG(ORCH_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but memory_[canonical_objref].second = " << memory_[canonical_objref].second); + HALO_LOG(HALO_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but memory_[canonical_objref].second = " << memory_[canonical_objref].second); } if (canonical_objref >= memory_.size()) { - ORCH_LOG(ORCH_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); + HALO_LOG(HALO_FATAL, "Attempting to deallocate canonical_objref " << canonical_objref << ", but it is not in the objstore."); } segmentpool_lock_.lock(); segmentpool_->deallocate(memory_[canonical_objref].first); @@ -208,7 +208,7 @@ void ObjStoreService::process_objstore_request(const ObjRequest request) { } break; default: { - ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + HALO_LOG(HALO_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -237,13 +237,13 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { std::lock_guard memory_lock(memory_lock_); std::pair& item = memory_[request.objref]; if (item.second == MemoryStatusType::READY) { - ORCH_LOG(ORCH_DEBUG, "Responding to GET request: returning objref " << request.objref); + HALO_LOG(HALO_DEBUG, "Responding to GET request: returning objref " << request.objref); send_queues_[request.workerid].send(&item.first); } else if (item.second == MemoryStatusType::NOT_READY || item.second == MemoryStatusType::NOT_PRESENT || item.second == MemoryStatusType::PRE_ALLOCED) { std::lock_guard lock(pull_queue_lock_); pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); } else { - ORCH_LOG(ORCH_FATAL, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); + HALO_LOG(HALO_FATAL, "A worker requested objref " << request.objref << ", but memory_[objref].second = " << memory_[request.objref].second); } } break; @@ -252,7 +252,7 @@ void ObjStoreService::process_worker_request(const ObjRequest request) { } break; default: { - ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + HALO_LOG(HALO_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -264,17 +264,17 @@ void ObjStoreService::process_requests() { recv_queue_.receive(&request); switch (request.type) { case ObjRequestType::ALLOC: { - ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Allocate object with objref " << request.objref << " and size " << request.size); + HALO_LOG(HALO_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Allocate object with objref " << request.objref << " and size " << request.size); process_worker_request(request); } break; case ObjRequestType::GET: { - ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Get object with objref " << request.objref); + HALO_LOG(HALO_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Get object with objref " << request.objref); process_worker_request(request); } break; case ObjRequestType::WORKER_DONE: { - ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Finalize object with objref " << request.objref); + HALO_LOG(HALO_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Finalize object with objref " << request.objref); process_worker_request(request); } break; @@ -283,7 +283,7 @@ void ObjStoreService::process_requests() { } break; default: { - ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + HALO_LOG(HALO_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); } } } @@ -309,9 +309,9 @@ ObjHandle ObjStoreService::alloc(ObjRef objref, size_t size) { ObjHandle handle = segmentpool_->allocate(size); segmentpool_lock_.unlock(); std::lock_guard memory_lock(memory_lock_); - ORCH_LOG(ORCH_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_); + HALO_LOG(HALO_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_); if (memory_[objref].second != MemoryStatusType::NOT_PRESENT && memory_[objref].second != MemoryStatusType::PRE_ALLOCED) { - ORCH_LOG(ORCH_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); + HALO_LOG(HALO_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second); } memory_[objref].first = handle; memory_[objref].second = MemoryStatusType::NOT_READY; @@ -323,7 +323,7 @@ void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { std::lock_guard memory_lock(memory_lock_); std::pair& item = memory_[objref]; if (item.second != MemoryStatusType::NOT_READY) { - ORCH_LOG(ORCH_FATAL, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); + HALO_LOG(HALO_FATAL, "A worker notified the object store that objref " << objref << " has been written to the object store, but memory_[objref].second != NOT_READY."); } item.first.set_metadata_offset(metadata_offset); item.second = MemoryStatusType::READY; @@ -341,14 +341,14 @@ void ObjStoreService::object_ready(ObjRef objref, size_t metadata_offset) { void ObjStoreService::start_objstore_service() { communicator_thread_ = std::thread([this]() { - ORCH_LOG(ORCH_INFO, "started object store communicator server"); + HALO_LOG(HALO_INFO, "started object store communicator server"); process_requests(); }); } void start_objstore(const char* scheduler_addr, const char* objstore_addr) { auto scheduler_channel = grpc::CreateChannel(scheduler_addr, grpc::InsecureChannelCredentials()); - ORCH_LOG(ORCH_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); + HALO_LOG(HALO_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); std::string objstore_address(objstore_addr); ObjStoreService service(objstore_address, scheduler_channel); service.start_objstore_service(); @@ -365,7 +365,7 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) { int main(int argc, char** argv) { if (argc != 3) { - ORCH_LOG(ORCH_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)"); + HALO_LOG(HALO_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)"); return 1; } diff --git a/src/objstore.h b/src/objstore.h index 36cda7df9..ccc3076a6 100644 --- a/src/objstore.h +++ b/src/objstore.h @@ -1,5 +1,5 @@ -#ifndef ORCHESTRA_OBJSTORE_H -#define ORCHESTRA_OBJSTORE_H +#ifndef HALO_OBJSTORE_H +#define HALO_OBJSTORE_H #include #include @@ -7,8 +7,8 @@ #include #include -#include "orchestra/orchestra.h" -#include "orchestra.grpc.pb.h" +#include "halo/halo.h" +#include "halo.grpc.pb.h" #include "types.pb.h" #include "ipc.h" diff --git a/src/scheduler.cc b/src/scheduler.cc index 3aeaea056..b06421654 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -14,7 +14,7 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ if (fntable_.find(task->name()) == fntable_.end()) { // TODO(rkn): In the future, this should probably not be fatal. Instead, propagate the error back to the worker. - ORCH_LOG(ORCH_FATAL, "The function " << task->name() << " has not been registered by any worker."); + HALO_LOG(HALO_FATAL, "The function " << task->name() << " has not been registered by any worker."); } size_t num_return_vals = fntable_[task->name()].num_return_vals(); @@ -29,8 +29,8 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ } { std::lock_guard reference_counts_lock(reference_counts_lock_); // we grab this lock because increment_ref_count assumes it has been acquired - increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before we reply to the worker that called SubmitTask. The corresponding decrement will happen in submit_task in orchpylib. - increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before the task is scheduled on the worker. The corresponding decrement will happen in deserialize_task in orchpylib. + increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before we reply to the worker that called SubmitTask. The corresponding decrement will happen in submit_task in halolib. + increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before the task is scheduled on the worker. The corresponding decrement will happen in deserialize_task in halolib. } auto operation = std::unique_ptr(new Operation()); @@ -64,7 +64,7 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ ObjRef objref = request->objref(); if (objref >= size) { - ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << objref << " exists"); + HALO_LOG(HALO_FATAL, "internal error: no object with objref " << objref << " exists"); } pull_queue_lock_.lock(); @@ -77,23 +77,23 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) { ObjRef alias_objref = request->alias_objref(); ObjRef target_objref = request->target_objref(); - ORCH_LOG(ORCH_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); + HALO_LOG(HALO_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); if (alias_objref == target_objref) { - ORCH_LOG(ORCH_FATAL, "internal error: attempting to alias objref " << alias_objref << " with itself."); + HALO_LOG(HALO_FATAL, "internal error: attempting to alias objref " << alias_objref << " with itself."); } objtable_lock_.lock(); size_t size = objtable_.size(); objtable_lock_.unlock(); if (alias_objref >= size) { - ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << alias_objref << " exists"); + HALO_LOG(HALO_FATAL, "internal error: no object with objref " << alias_objref << " exists"); } if (target_objref >= size) { - ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << target_objref << " exists"); + HALO_LOG(HALO_FATAL, "internal error: no object with objref " << target_objref << " exists"); } { std::lock_guard target_objrefs_lock(target_objrefs_lock_); if (target_objrefs_[alias_objref] != UNITIALIZED_ALIAS) { - ORCH_LOG(ORCH_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); + HALO_LOG(HALO_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); } target_objrefs_[alias_objref] = target_objref; } @@ -121,7 +121,7 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo std::pair info = register_worker(request->worker_address(), request->objstore_address()); WorkerId workerid = info.first; ObjStoreId objstoreid = info.second; - ORCH_LOG(ORCH_INFO, "registered worker with workerid " << workerid); + HALO_LOG(HALO_INFO, "registered worker with workerid " << workerid); reply->set_workerid(workerid); reply->set_objstoreid(objstoreid); schedule(); @@ -129,7 +129,7 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo } Status SchedulerService::RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) { - ORCH_LOG(ORCH_INFO, "register function " << request->fnname() << " from workerid " << request->workerid()); + HALO_LOG(HALO_INFO, "register function " << request->fnname() << " from workerid " << request->workerid()); register_function(request->fnname(), request->workerid(), request->num_return_vals()); schedule(); return Status::OK; @@ -137,7 +137,7 @@ Status SchedulerService::RegisterFunction(ServerContext* context, const Register Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* request, AckReply* reply) { ObjRef objref = request->objref(); - ORCH_LOG(ORCH_DEBUG, "object " << objref << " ready on store " << request->objstoreid()); + HALO_LOG(HALO_DEBUG, "object " << objref << " ready on store " << request->objstoreid()); add_canonical_objref(objref); add_location(objref, request->objstoreid()); schedule(); @@ -145,7 +145,7 @@ Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* } Status SchedulerService::WorkerReady(ServerContext* context, const WorkerReadyRequest* request, AckReply* reply) { - ORCH_LOG(ORCH_INFO, "worker " << request->workerid() << " reported back"); + HALO_LOG(HALO_INFO, "worker " << request->workerid() << " reported back"); { std::lock_guard lock(avail_workers_lock_); avail_workers_.push_back(request->workerid()); @@ -157,7 +157,7 @@ Status SchedulerService::WorkerReady(ServerContext* context, const WorkerReadyRe Status SchedulerService::IncrementRefCount(ServerContext* context, const IncrementRefCountRequest* request, AckReply* reply) { int num_objrefs = request->objref_size(); if (num_objrefs == 0) { - ORCH_LOG(ORCH_FATAL, "Scheduler received IncrementRefCountRequest with 0 objrefs."); + HALO_LOG(HALO_FATAL, "Scheduler received IncrementRefCountRequest with 0 objrefs."); } std::vector objrefs; for (int i = 0; i < num_objrefs; ++i) { @@ -171,7 +171,7 @@ Status SchedulerService::IncrementRefCount(ServerContext* context, const Increme Status SchedulerService::DecrementRefCount(ServerContext* context, const DecrementRefCountRequest* request, AckReply* reply) { int num_objrefs = request->objref_size(); if (num_objrefs == 0) { - ORCH_LOG(ORCH_FATAL, "Scheduler received DecrementRefCountRequest with 0 objrefs."); + HALO_LOG(HALO_FATAL, "Scheduler received DecrementRefCountRequest with 0 objrefs."); } std::vector objrefs; for (int i = 0; i < num_objrefs; ++i) { @@ -186,11 +186,11 @@ Status SchedulerService::AddContainedObjRefs(ServerContext* context, const AddCo ObjRef objref = request->objref(); // if (!is_canonical(objref)) { // TODO(rkn): Perhaps we don't need this check. It won't work because the objstore may not have called ObjReady yet. - // ORCH_LOG(ORCH_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref); + // HALO_LOG(HALO_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref); // } std::lock_guard contained_objrefs_lock(contained_objrefs_lock_); if (contained_objrefs_[objref].size() != 0) { - ORCH_LOG(ORCH_FATAL, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); + HALO_LOG(HALO_FATAL, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); } for (int i = 0; i < request->contained_objref_size(); ++i) { contained_objrefs_[objref].push_back(request->contained_objref(i)); @@ -212,10 +212,10 @@ Status SchedulerService::SchedulerInfo(ServerContext* context, const SchedulerIn // deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) { if (from == to) { - ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); + HALO_LOG(HALO_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); } if (!has_canonical_objref(objref)) { - ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); + HALO_LOG(HALO_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); } ClientContext context; AckReply reply; @@ -235,7 +235,7 @@ void SchedulerService::schedule() { } else if (scheduling_algorithm_ == SCHEDULING_ALGORITHM_LOCALITY_AWARE) { schedule_tasks_location_aware(); // See what we can do in task_queue_ } else { - ORCH_LOG(ORCH_FATAL, "scheduling algorithm not known"); + HALO_LOG(HALO_FATAL, "scheduling algorithm not known"); } perform_notify_aliases(); // See what we can do in alias_notification_queue_ } @@ -247,7 +247,7 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { ClientContext context; ExecuteTaskRequest request; ExecuteTaskReply reply; - ORCH_LOG(ORCH_INFO, "starting to send arguments"); + HALO_LOG(HALO_INFO, "starting to send arguments"); for (size_t i = 0; i < task.arg_size(); ++i) { if (!task.arg(i).has_obj()) { ObjRef objref = task.arg(i).ref(); @@ -259,7 +259,7 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { } attempt_notify_alias(get_store(workerid), objref, canonical_objref); - ORCH_LOG(ORCH_DEBUG, "task contains object ref " << canonical_objref); + HALO_LOG(HALO_DEBUG, "task contains object ref " << canonical_objref); std::lock_guard objtable_lock(objtable_lock_); auto &objstores = objtable_[canonical_objref]; std::lock_guard workers_lock(workers_lock_); @@ -290,7 +290,7 @@ bool SchedulerService::can_run(const Task& task) { } std::pair SchedulerService::register_worker(const std::string& worker_address, const std::string& objstore_address) { - ORCH_LOG(ORCH_INFO, "registering worker " << worker_address << " connected to object store " << objstore_address); + HALO_LOG(HALO_INFO, "registering worker " << worker_address << " connected to object store " << objstore_address); ObjStoreId objstoreid = std::numeric_limits::max(); for (int num_attempts = 0; num_attempts < 5; ++num_attempts) { std::lock_guard lock(objstores_lock_); @@ -304,7 +304,7 @@ std::pair SchedulerService::register_worker(const std::str } } if (objstoreid == std::numeric_limits::max()) { - ORCH_LOG(ORCH_FATAL, "object store with address " << objstore_address << " not yet registered"); + HALO_LOG(HALO_FATAL, "object store with address " << objstore_address << " not yet registered"); } workers_lock_.lock(); WorkerId workerid = workers_.size(); @@ -334,16 +334,16 @@ ObjRef SchedulerService::register_new_object() { ObjRef reference_counts_size = reference_counts_.size(); ObjRef contained_objrefs_size = contained_objrefs_.size(); if (objtable_size != target_objrefs_size) { - ORCH_LOG(ORCH_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); + HALO_LOG(HALO_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); } if (objtable_size != reverse_target_objrefs_size) { - ORCH_LOG(ORCH_FATAL, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); + HALO_LOG(HALO_FATAL, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); } if (objtable_size != reference_counts_size) { - ORCH_LOG(ORCH_FATAL, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); + HALO_LOG(HALO_FATAL, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); } if (objtable_size != contained_objrefs_size) { - ORCH_LOG(ORCH_FATAL, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); + HALO_LOG(HALO_FATAL, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); } objtable_.push_back(std::vector()); target_objrefs_.push_back(UNITIALIZED_ALIAS); @@ -356,11 +356,11 @@ ObjRef SchedulerService::register_new_object() { void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) { // add_location must be called with a canonical objref if (!is_canonical(canonical_objref)) { - ORCH_LOG(ORCH_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); + HALO_LOG(HALO_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); } std::lock_guard objtable_lock(objtable_lock_); if (canonical_objref >= objtable_.size()) { - ORCH_LOG(ORCH_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); + HALO_LOG(HALO_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); } // do a binary search auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid); @@ -372,10 +372,10 @@ void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstore void SchedulerService::add_canonical_objref(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); if (objref >= target_objrefs_.size()) { - ORCH_LOG(ORCH_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); + HALO_LOG(HALO_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); } if (target_objrefs_[objref] != UNITIALIZED_ALIAS && target_objrefs_[objref] != objref) { - ORCH_LOG(ORCH_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); + HALO_LOG(HALO_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); } target_objrefs_[objref] = objref; } @@ -433,7 +433,7 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { std::mt19937 rng; if (!is_canonical(canonical_objref)) { - ORCH_LOG(ORCH_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); + HALO_LOG(HALO_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); } std::uniform_int_distribution uni(0, objtable_[canonical_objref].size() - 1); ObjStoreId objstoreid = objtable_[canonical_objref][uni(rng)]; @@ -443,7 +443,7 @@ ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { bool SchedulerService::is_canonical(ObjRef objref) { std::lock_guard lock(target_objrefs_lock_); if (target_objrefs_[objref] == UNITIALIZED_ALIAS) { - ORCH_LOG(ORCH_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); + HALO_LOG(HALO_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); } return objref == target_objrefs_[objref]; } @@ -456,11 +456,11 @@ void SchedulerService::perform_pulls() { ObjRef objref = pull.second; WorkerId workerid = pull.first; if (!has_canonical_objref(objref)) { - ORCH_LOG(ORCH_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing"); + HALO_LOG(HALO_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing"); continue; } ObjRef canonical_objref = get_canonical_objref(objref); - ORCH_LOG(ORCH_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); + HALO_LOG(HALO_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); objtable_lock_.lock(); int num_stores = objtable_[canonical_objref].size(); @@ -538,7 +538,7 @@ void SchedulerService::schedule_tasks_location_aware() { if (!task.arg(j).has_obj()) { ObjRef objref = task.arg(j).ref(); if (!has_canonical_objref(objref)) { - ORCH_LOG(ORCH_FATAL, "no canonical object ref found even though task is ready; that should not be possible!"); + HALO_LOG(HALO_FATAL, "no canonical object ref found even though task is ready; that should not be possible!"); } ObjRef canonical_objref = get_canonical_objref(objref); // check if the object is already in the local object store @@ -585,7 +585,7 @@ bool SchedulerService::has_canonical_objref(ObjRef objref) { ObjRef objref_temp = objref; while (true) { if (objref_temp >= target_objrefs_.size()) { - ORCH_LOG(ORCH_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + HALO_LOG(HALO_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); } if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { return false; @@ -603,16 +603,16 @@ ObjRef SchedulerService::get_canonical_objref(ObjRef objref) { ObjRef objref_temp = objref; while (true) { if (objref_temp >= target_objrefs_.size()) { - ORCH_LOG(ORCH_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + HALO_LOG(HALO_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); } if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { - ORCH_LOG(ORCH_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); + HALO_LOG(HALO_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); } if (target_objrefs_[objref_temp] == objref_temp) { return objref_temp; } objref_temp = target_objrefs_[objref_temp]; - ORCH_LOG(ORCH_ALIAS, "Looping in get_canonical_objref."); + HALO_LOG(HALO_ALIAS, "Looping in get_canonical_objref."); } } @@ -646,7 +646,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) { // these methods require reference_counts_lock_ to have been acquired, and // so the lock must before outside of these methods (it is acquired in // DecrementRefCount). - ORCH_LOG(ORCH_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << "."); + HALO_LOG(HALO_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << "."); { std::lock_guard objtable_lock(objtable_lock_); auto &objstores = objtable_[canonical_objref]; @@ -657,7 +657,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) { DeallocateObjectRequest request; request.set_canonical_objref(canonical_objref); ObjStoreId objstoreid = objstores[i]; - ORCH_LOG(ORCH_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid); + HALO_LOG(HALO_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid); objstores_[objstoreid].objstore_stub->DeallocateObject(&context, request, &reply); } objtable_[canonical_objref].clear(); @@ -670,10 +670,10 @@ void SchedulerService::increment_ref_count(std::vector &objrefs) { for (int i = 0; i < objrefs.size(); ++i) { ObjRef objref = objrefs[i]; if (reference_counts_[objref] == DEALLOCATED) { - ORCH_LOG(ORCH_FATAL, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); + HALO_LOG(HALO_FATAL, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); } reference_counts_[objref] += 1; - ORCH_LOG(ORCH_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << reference_counts_[objref]); + HALO_LOG(HALO_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << reference_counts_[objref]); } } @@ -682,13 +682,13 @@ void SchedulerService::decrement_ref_count(std::vector &objrefs) { for (int i = 0; i < objrefs.size(); ++i) { ObjRef objref = objrefs[i]; if (reference_counts_[objref] == DEALLOCATED) { - ORCH_LOG(ORCH_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); + HALO_LOG(HALO_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); } if (reference_counts_[objref] == 0) { - ORCH_LOG(ORCH_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); + HALO_LOG(HALO_FATAL, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); } reference_counts_[objref] -= 1; - ORCH_LOG(ORCH_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << reference_counts_[objref]); + HALO_LOG(HALO_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << reference_counts_[objref]); // See if we can deallocate the object std::vector equivalent_objrefs; get_equivalent_objrefs(objref, equivalent_objrefs); @@ -702,7 +702,7 @@ void SchedulerService::decrement_ref_count(std::vector &objrefs) { if (can_deallocate) { ObjRef canonical_objref = equivalent_objrefs[0]; if (!is_canonical(canonical_objref)) { - ORCH_LOG(ORCH_FATAL, "canonical_objref is not canonical."); + HALO_LOG(HALO_FATAL, "canonical_objref is not canonical."); } deallocate_object(canonical_objref); for (int j = 0; j < equivalent_objrefs.size(); ++j) { @@ -724,7 +724,7 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector std::lock_guard target_objrefs_lock(target_objrefs_lock_); ObjRef downstream_objref = objref; while (target_objrefs_[downstream_objref] != downstream_objref && target_objrefs_[downstream_objref] != UNITIALIZED_ALIAS) { - ORCH_LOG(ORCH_ALIAS, "Looping in get_equivalent_objrefs"); + HALO_LOG(HALO_ALIAS, "Looping in get_equivalent_objrefs"); downstream_objref = target_objrefs_[downstream_objref]; } std::lock_guard reverse_target_objrefs_lock(reverse_target_objrefs_lock_); @@ -755,7 +755,7 @@ char* get_cmd_option(char** begin, char** end, const std::string& option) { int main(int argc, char** argv) { SchedulingAlgorithmType scheduling_algorithm = SCHEDULING_ALGORITHM_LOCALITY_AWARE; if (argc < 2) { - ORCH_LOG(ORCH_FATAL, "scheduler: expected at least one argument (scheduler ip address)"); + HALO_LOG(HALO_FATAL, "scheduler: expected at least one argument (scheduler ip address)"); return 1; } if (argc > 2) { diff --git a/src/scheduler.h b/src/scheduler.h index d0bdf8182..326220250 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,5 +1,5 @@ -#ifndef ORCHESTRA_SCHEDULER_H -#define ORCHESTRA_SCHEDULER_H +#ifndef HALO_SCHEDULER_H +#define HALO_SCHEDULER_H #include @@ -10,8 +10,8 @@ #include -#include "orchestra/orchestra.h" -#include "orchestra.grpc.pb.h" +#include "halo/halo.h" +#include "halo.grpc.pb.h" #include "types.pb.h" #include "computation_graph.h" diff --git a/src/utils.h b/src/utils.h index 1c8d540a8..218c3c355 100644 --- a/src/utils.h +++ b/src/utils.h @@ -1,5 +1,5 @@ -#ifndef ORCHESTRA_UTILS_H -#define ORCHESTRA_UTILS_H +#ifndef HALO_UTILS_H +#define HALO_UTILS_H inline std::string::iterator split_ip_address(std::string& ip_address) { if (ip_address[0] == '[') { // IPv6 @@ -10,11 +10,11 @@ inline std::string::iterator split_ip_address(std::string& ip_address) { if(split_end != ip_address.end() && *split_end == ':') { return split_end; } - ORCH_LOG(ORCH_FATAL, "ip address should contain a port number"); + HALO_LOG(HALO_FATAL, "ip address should contain a port number"); } else { // IPv4 auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base(); if (split_point == ip_address.begin()) { - ORCH_LOG(ORCH_FATAL, "ip address should contain a port number"); + HALO_LOG(HALO_FATAL, "ip address should contain a port number"); } else { return split_point; } diff --git a/src/worker.cc b/src/worker.cc index 002c4521f..125a908a2 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -5,12 +5,12 @@ #include extern "C" { - static PyObject *OrchPyError; + static PyObject *HaloError; } Status WorkerServiceImpl::ExecuteTask(ServerContext* context, const ExecuteTaskRequest* request, ExecuteTaskReply* reply) { task_ = request->task(); // Copy task - ORCH_LOG(ORCH_INFO, "invoked task " << request->task().name()); + HALO_LOG(HALO_INFO, "invoked task " << request->task().name()); Task* taskptr = &task_; send_queue_.send(&taskptr); return Status::OK; @@ -26,7 +26,7 @@ Worker::Worker(const std::string& worker_address, std::shared_ptr sched SubmitTaskReply Worker::submit_task(SubmitTaskRequest* request) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform submit_task, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform submit_task, but connected_ = " << connected_ << "."); } SubmitTaskReply reply; ClientContext context; @@ -52,7 +52,7 @@ void Worker::register_worker(const std::string& worker_address, const std::strin void Worker::request_object(ObjRef objref) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform request_object, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform request_object, but connected_ = " << connected_ << "."); } RequestObjRequest request; request.set_workerid(workerid_); @@ -66,7 +66,7 @@ void Worker::request_object(ObjRef objref) { ObjRef Worker::get_objref() { // first get objref for the new object if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform get_objref, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform get_objref, but connected_ = " << connected_ << "."); } PushObjRequest push_request; PushObjReply push_reply; @@ -78,7 +78,7 @@ ObjRef Worker::get_objref() { slice Worker::get_object(ObjRef objref) { // get_object assumes that objref is a canonical objref if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform get_object, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform get_object, but connected_ = " << connected_ << "."); } ObjRequest request; request.workerid = workerid_; @@ -97,7 +97,7 @@ slice Worker::get_object(ObjRef objref) { // contained_objrefs is a vector of all the objrefs contained in obj void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &contained_objrefs) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform put_object, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform put_object, but connected_ = " << connected_ << "."); } std::string data; obj->SerializeToString(&data); // TODO(pcm): get rid of this serialization @@ -108,7 +108,7 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont request.size = data.size(); request_obj_queue_.send(&request); if (contained_objrefs.size() > 0) { - ORCH_LOG(ORCH_REFCOUNT, "In put_object, calling increment_reference_count for contained objrefs"); + HALO_LOG(HALO_REFCOUNT, "In put_object, calling increment_reference_count for contained objrefs"); increment_reference_count(contained_objrefs); // Notify the scheduler that some object references are serialized in the objstore. } ObjHandle result; @@ -135,14 +135,14 @@ void Worker::put_object(ObjRef objref, const Obj* obj, std::vector &cont arrow::Status _s = (s); \ if (!_s.ok()) { \ std::string _errmsg = std::string(msg) + _s.ToString(); \ - PyErr_SetString(OrchPyError, _errmsg.c_str()); \ + PyErr_SetString(HaloError, _errmsg.c_str()); \ return NULL; \ } \ } while (0); PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform put_arrow, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform put_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; pynumbuf::PythonObjectWriter writer; @@ -168,7 +168,7 @@ PyObject* Worker::put_arrow(ObjRef objref, PyObject* value) { PyObject* Worker::get_arrow(ObjRef objref) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform get_arrow, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform get_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; request.workerid = workerid_; @@ -186,7 +186,7 @@ PyObject* Worker::get_arrow(ObjRef objref) { bool Worker::is_arrow(ObjRef objref) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform is_arrow, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform is_arrow, but connected_ = " << connected_ << "."); } ObjRequest request; request.workerid = workerid_; @@ -200,7 +200,7 @@ bool Worker::is_arrow(ObjRef objref) { void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform alias_objrefs, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform alias_objrefs, but connected_ = " << connected_ << "."); } ClientContext context; AliasObjRefsRequest request; @@ -212,14 +212,14 @@ void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { void Worker::increment_reference_count(std::vector &objrefs) { if (!connected_) { - ORCH_LOG(ORCH_DEBUG, "Attempting to increment_reference_count for objrefs, but connected_ = " << connected_ << " so returning instead."); + HALO_LOG(HALO_DEBUG, "Attempting to increment_reference_count for objrefs, but connected_ = " << connected_ << " so returning instead."); return; } if (objrefs.size() > 0) { ClientContext context; IncrementRefCountRequest request; for (int i = 0; i < objrefs.size(); ++i) { - ORCH_LOG(ORCH_REFCOUNT, "Incrementing reference count for objref " << objrefs[i]); + HALO_LOG(HALO_REFCOUNT, "Incrementing reference count for objref " << objrefs[i]); request.add_objref(objrefs[i]); } AckReply reply; @@ -229,14 +229,14 @@ void Worker::increment_reference_count(std::vector &objrefs) { void Worker::decrement_reference_count(std::vector &objrefs) { if (!connected_) { - ORCH_LOG(ORCH_DEBUG, "Attempting to decrement_reference_count, but connected_ = " << connected_ << " so returning instead."); + HALO_LOG(HALO_DEBUG, "Attempting to decrement_reference_count, but connected_ = " << connected_ << " so returning instead."); return; } if (objrefs.size() > 0) { ClientContext context; DecrementRefCountRequest request; for (int i = 0; i < objrefs.size(); ++i) { - ORCH_LOG(ORCH_REFCOUNT, "Decrementing reference count for objref " << objrefs[i]); + HALO_LOG(HALO_REFCOUNT, "Decrementing reference count for objref " << objrefs[i]); request.add_objref(objrefs[i]); } AckReply reply; @@ -246,7 +246,7 @@ void Worker::decrement_reference_count(std::vector &objrefs) { void Worker::register_function(const std::string& name, size_t num_return_vals) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform register_function, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform register_function, but connected_ = " << connected_ << "."); } ClientContext context; RegisterFunctionRequest request; @@ -265,7 +265,7 @@ Task* Worker::receive_next_task() { void Worker::notify_task_completed() { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to perform notify_task_completed, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to perform notify_task_completed, but connected_ = " << connected_ << "."); } ClientContext context; WorkerReadyRequest request; @@ -285,7 +285,7 @@ bool Worker::connected() { // TODO(rkn): Should we be using pointers or references? And should they be const? void Worker::scheduler_info(ClientContext &context, SchedulerInfoRequest &request, SchedulerInfoReply &reply) { if (!connected_) { - ORCH_LOG(ORCH_FATAL, "Attempting to get scheduler info, but connected_ = " << connected_ << "."); + HALO_LOG(HALO_FATAL, "Attempting to get scheduler info, but connected_ = " << connected_ << "."); } scheduler_stub_->SchedulerInfo(&context, request, &reply); } @@ -306,7 +306,7 @@ void Worker::start_worker_service() { builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptr server(builder.BuildAndStart()); - ORCH_LOG(ORCH_INFO, "worker server listening on " << service_address); + HALO_LOG(HALO_INFO, "worker server listening on " << service_address); server->Wait(); }); } diff --git a/src/worker.h b/src/worker.h index bdec7dfd5..f827cbe22 100644 --- a/src/worker.h +++ b/src/worker.h @@ -1,5 +1,5 @@ -#ifndef ORCHESTRA_WORKER_H -#define ORCHESTRA_WORKER_H +#ifndef HALO_WORKER_H +#define HALO_WORKER_H #include #include @@ -15,8 +15,8 @@ using grpc::ServerBuilder; using grpc::ServerContext; using grpc::Status; -#include "orchestra.grpc.pb.h" -#include "orchestra/orchestra.h" +#include "halo.grpc.pb.h" +#include "halo/halo.h" #include "ipc.h" using grpc::Channel; diff --git a/test/arrays_test.py b/test/arrays_test.py index 7b9a9d720..2bf2c5cf6 100644 --- a/test/arrays_test.py +++ b/test/arrays_test.py @@ -1,8 +1,8 @@ import unittest -import orchpy -import orchpy.serialization as serialization -import orchpy.services as services -import orchpy.worker as worker +import halo +import halo.serialization as serialization +import halo.services as services +import halo.worker as worker import numpy as np import time import subprocess32 as subprocess @@ -14,7 +14,7 @@ import arrays.dist as dist from google.protobuf.text_format import * from grpc.beta import implementations -import orchestra_pb2 +import halo_pb2 import types_pb2 class ArraysSingleTest(unittest.TestCase): @@ -26,27 +26,27 @@ class ArraysSingleTest(unittest.TestCase): # test eye ref = single.eye(3) - val = orchpy.pull(ref) + val = halo.pull(ref) self.assertTrue(np.alltrue(val == np.eye(3))) # test zeros ref = single.zeros([3, 4, 5]) - val = orchpy.pull(ref) + val = halo.pull(ref) self.assertTrue(np.alltrue(val == np.zeros([3, 4, 5]))) # test qr - pass by value val_a = np.random.normal(size=[10, 13]) ref_q, ref_r = single.linalg.qr(val_a) - val_q = orchpy.pull(ref_q) - val_r = orchpy.pull(ref_r) + val_q = halo.pull(ref_q) + val_r = halo.pull(ref_r) self.assertTrue(np.allclose(np.dot(val_q, val_r), val_a)) # test qr - pass by objref a = single.random.normal([10, 13]) ref_q, ref_r = single.linalg.qr(a) - val_a = orchpy.pull(a) - val_q = orchpy.pull(ref_q) - val_r = orchpy.pull(ref_r) + val_a = halo.pull(a) + val_q = halo.pull(ref_q) + val_r = halo.pull(ref_r) self.assertTrue(np.allclose(np.dot(val_q, val_r), val_a)) services.cleanup() @@ -57,7 +57,7 @@ class ArraysDistTest(unittest.TestCase): [w] = services.start_singlenode_cluster(return_drivers=True) x = dist.DistArray() - x.construct([2, 3, 4], np.array([[[orchpy.push(0, w)]]])) + x.construct([2, 3, 4], np.array([[[halo.push(0, w)]]])) capsule, _ = serialization.serialize(w.handle, x) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE y = serialization.deserialize(w.handle, capsule) # TODO(rkn): THIS REQUIRES A WORKER_HANDLE self.assertEqual(x.shape, y.shape) @@ -85,33 +85,33 @@ class ArraysDistTest(unittest.TestCase): x = dist.zeros([9, 25, 51], "float") y = dist.assemble(x) - self.assertTrue(np.alltrue(orchpy.pull(y) == np.zeros([9, 25, 51]))) + self.assertTrue(np.alltrue(halo.pull(y) == np.zeros([9, 25, 51]))) x = dist.ones([11, 25, 49], dtype_name="float") y = dist.assemble(x) - self.assertTrue(np.alltrue(orchpy.pull(y) == np.ones([11, 25, 49]))) + self.assertTrue(np.alltrue(halo.pull(y) == np.ones([11, 25, 49]))) x = dist.random.normal([11, 25, 49]) y = dist.copy(x) z = dist.assemble(x) w = dist.assemble(y) - self.assertTrue(np.alltrue(orchpy.pull(z) == orchpy.pull(w))) + self.assertTrue(np.alltrue(halo.pull(z) == halo.pull(w))) x = dist.eye(25, dtype_name="float") y = dist.assemble(x) - self.assertTrue(np.alltrue(orchpy.pull(y) == np.eye(25))) + self.assertTrue(np.alltrue(halo.pull(y) == np.eye(25))) x = dist.random.normal([25, 49]) y = dist.triu(x) z = dist.assemble(y) w = dist.assemble(x) - self.assertTrue(np.alltrue(orchpy.pull(z) == np.triu(orchpy.pull(w)))) + self.assertTrue(np.alltrue(halo.pull(z) == np.triu(halo.pull(w)))) x = dist.random.normal([25, 49]) y = dist.tril(x) z = dist.assemble(y) w = dist.assemble(x) - self.assertTrue(np.alltrue(orchpy.pull(z) == np.tril(orchpy.pull(w)))) + self.assertTrue(np.alltrue(halo.pull(z) == np.tril(halo.pull(w)))) x = dist.random.normal([25, 49]) y = dist.random.normal([49, 18]) @@ -119,8 +119,8 @@ class ArraysDistTest(unittest.TestCase): w = dist.assemble(z) u = dist.assemble(x) v = dist.assemble(y) - np.allclose(orchpy.pull(w), np.dot(orchpy.pull(u), orchpy.pull(v))) - self.assertTrue(np.allclose(orchpy.pull(w), np.dot(orchpy.pull(u), orchpy.pull(v)))) + np.allclose(halo.pull(w), np.dot(halo.pull(u), halo.pull(v))) + self.assertTrue(np.allclose(halo.pull(w), np.dot(halo.pull(u), halo.pull(v)))) # test add x = dist.random.normal([23, 42]) @@ -129,7 +129,7 @@ class ArraysDistTest(unittest.TestCase): z_full = dist.assemble(z) x_full = dist.assemble(x) y_full = dist.assemble(y) - self.assertTrue(np.allclose(orchpy.pull(z_full), orchpy.pull(x_full) + orchpy.pull(y_full))) + self.assertTrue(np.allclose(halo.pull(z_full), halo.pull(x_full) + halo.pull(y_full))) # test subtract x = dist.random.normal([33, 40]) @@ -138,14 +138,14 @@ class ArraysDistTest(unittest.TestCase): z_full = dist.assemble(z) x_full = dist.assemble(x) y_full = dist.assemble(y) - self.assertTrue(np.allclose(orchpy.pull(z_full), orchpy.pull(x_full) - orchpy.pull(y_full))) + self.assertTrue(np.allclose(halo.pull(z_full), halo.pull(x_full) - halo.pull(y_full))) # test transpose x = dist.random.normal([234, 432]) y = dist.transpose(x) x_full = dist.assemble(x) y_full = dist.assemble(y) - self.assertTrue(np.alltrue(orchpy.pull(x_full).T == orchpy.pull(y_full))) + self.assertTrue(np.alltrue(halo.pull(x_full).T == halo.pull(y_full))) # test numpy_to_dist x = dist.random.normal([23, 45]) @@ -154,8 +154,8 @@ class ArraysDistTest(unittest.TestCase): w = dist.assemble(z) x_full = dist.assemble(x) z_full = dist.assemble(z) - self.assertTrue(np.alltrue(orchpy.pull(x_full) == orchpy.pull(z_full))) - self.assertTrue(np.alltrue(orchpy.pull(y) == orchpy.pull(w))) + self.assertTrue(np.alltrue(halo.pull(x_full) == halo.pull(z_full))) + self.assertTrue(np.alltrue(halo.pull(y) == halo.pull(w))) # test dist.tsqr for shape in [[123, dist.BLOCK_SIZE], [7, dist.BLOCK_SIZE], [dist.BLOCK_SIZE, dist.BLOCK_SIZE], [dist.BLOCK_SIZE, 7], [10 * dist.BLOCK_SIZE, dist.BLOCK_SIZE]]: @@ -163,10 +163,10 @@ class ArraysDistTest(unittest.TestCase): K = min(shape) q, r = dist.linalg.tsqr(x) x_full = dist.assemble(x) - x_val = orchpy.pull(x_full) + x_val = halo.pull(x_full) q_full = dist.assemble(q) - q_val = orchpy.pull(q_full) - r_val = orchpy.pull(r) + q_val = halo.pull(q_full) + r_val = halo.pull(r) self.assertTrue(r_val.shape == (K, shape[1])) self.assertTrue(np.alltrue(r_val == np.triu(r_val))) self.assertTrue(np.allclose(x_val, np.dot(q_val, r_val))) @@ -180,12 +180,12 @@ class ArraysDistTest(unittest.TestCase): m = single.random.normal([d1, d2]) q, r = single.linalg.qr(m) l, u, s = dist.linalg.modified_lu(dist.numpy_to_dist(q)) - q_val = orchpy.pull(q) - r_val = orchpy.pull(r) + q_val = halo.pull(q) + r_val = halo.pull(r) l_full = dist.assemble(l) - l_val = orchpy.pull(l_full) - u_val = orchpy.pull(u) - s_val = orchpy.pull(s) + l_val = halo.pull(l_full) + u_val = halo.pull(u) + s_val = halo.pull(s) s_mat = np.zeros((d1, d2)) for i in range(len(s_val)): s_mat[i, i] = s_val[i] @@ -202,17 +202,17 @@ class ArraysDistTest(unittest.TestCase): a = dist.random.normal([d1, d2]) y, t, y_top, r = dist.linalg.tsqr_hr(a) a_full = dist.assemble(a) - a_val = orchpy.pull(a_full) + a_val = halo.pull(a_full) y_full = dist.assemble(y) - y_val = orchpy.pull(y_full) - t_val = orchpy.pull(t) - y_top_val = orchpy.pull(y_top) - r_val = orchpy.pull(r) + y_val = halo.pull(y_full) + t_val = halo.pull(t) + y_top_val = halo.pull(y_top) + r_val = halo.pull(r) tall_eye = np.zeros((d1, min(d1, d2))) np.fill_diagonal(tall_eye, 1) q = tall_eye - np.dot(y_val, np.dot(t_val, y_top_val.T)) self.assertTrue(np.allclose(np.dot(q.T, q), np.eye(min(d1, d2)))) # check that q.T * q = I - self.assertTrue(np.allclose(np.dot(q, r_val), a_val)) # check that a = (I - y * t * y_top.T) * r + self.assertTrue(np.allclose(np.dot(q, r_val), a_val)) # check that a = (I - y * t * y_thalo.T) * r for d1, d2 in [(123, dist.BLOCK_SIZE), (7, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, 7), (10 * dist.BLOCK_SIZE, dist.BLOCK_SIZE)]: test_dist_tsqr_hr(d1, d2) @@ -225,9 +225,9 @@ class ArraysDistTest(unittest.TestCase): a_full = dist.assemble(a) q_full = dist.assemble(q) r_full = dist.assemble(r) - a_val = orchpy.pull(a_full) - q_val = orchpy.pull(q_full) - r_val = orchpy.pull(r_full) + a_val = halo.pull(a_full) + q_val = halo.pull(q_full) + r_val = halo.pull(r_full) self.assertTrue(q_val.shape == (d1, K)) self.assertTrue(r_val.shape == (K, d2)) diff --git a/test/gen-python-code.sh b/test/gen-python-code.sh index 595dab90d..8221c6b4b 100644 --- a/test/gen-python-code.sh +++ b/test/gen-python-code.sh @@ -1,4 +1,4 @@ # For running the python tests -protoc -I ../protos/ --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../protos/orchestra.proto +protoc -I ../protos/ --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../protos/halo.proto protoc -I ../protos/ --python_out=. --grpc_out=. --plugin=protoc-gen-grpc=`which grpc_python_plugin` ../protos/types.proto diff --git a/test/microbenchmarks.py b/test/microbenchmarks.py index 5e4df2feb..94c1b7d09 100644 --- a/test/microbenchmarks.py +++ b/test/microbenchmarks.py @@ -1,6 +1,6 @@ import unittest -import orchpy -import orchpy.services as services +import halo +import halo.services as services import time import os import numpy as np @@ -51,7 +51,7 @@ class MicroBenchmarkTest(unittest.TestCase): for _ in range(1000): start_time = time.time() x = test_functions.trivial_function() - orchpy.pull(x) + halo.pull(x) end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) @@ -67,7 +67,7 @@ class MicroBenchmarkTest(unittest.TestCase): elapsed_times = [] for _ in range(1000): start_time = time.time() - orchpy.push(1) + halo.push(1) end_time = time.time() elapsed_times.append(end_time - start_time) elapsed_times = np.sort(elapsed_times) diff --git a/test/runtest.py b/test/runtest.py index 1cb2387e7..c0f49bc4a 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1,8 +1,8 @@ import unittest -import orchpy -import orchpy.serialization as serialization -import orchpy.services as services -import orchpy.worker as worker +import halo +import halo.serialization as serialization +import halo.services as services +import halo.worker as worker import numpy as np import time import subprocess32 as subprocess @@ -10,7 +10,7 @@ import os from google.protobuf.text_format import * -import orchestra_pb2 +import halo_pb2 import types_pb2 import test_functions @@ -61,10 +61,10 @@ class SerializationTest(unittest.TestCase): self.numpyTypeTest(w, 'float32') self.numpyTypeTest(w, 'float64') - ref0 = orchpy.push(0, w) - ref1 = orchpy.push(0, w) - ref2 = orchpy.push(0, w) - ref3 = orchpy.push(0, w) + ref0 = halo.push(0, w) + ref1 = halo.push(0, w) + ref2 = halo.push(0, w) + ref3 = halo.push(0, w) a = np.array([[ref0, ref1], [ref2, ref3]]) capsule, _ = serialization.serialize(w.handle, a) result = serialization.deserialize(w.handle, capsule) @@ -80,8 +80,8 @@ class ObjStoreTest(unittest.TestCase): # pushing and pulling an object shouldn't change it for data in ["h", "h" * 10000, 0, 0.0]: - objref = orchpy.push(data, w1) - result = orchpy.pull(objref, w1) + objref = halo.push(data, w1) + result = halo.pull(objref, w1) self.assertEqual(result, data) # pushing an object, shipping it to another worker, and pulling it shouldn't change it @@ -134,7 +134,7 @@ class SchedulerTest(unittest.TestCase): time.sleep(0.2) - value_after = orchpy.pull(objref[0], w) + value_after = halo.pull(objref[0], w) self.assertEqual(value_before, value_after) time.sleep(0.1) @@ -148,26 +148,26 @@ class WorkerTest(unittest.TestCase): for i in range(100): value_before = i * 10 ** 6 - objref = orchpy.push(value_before, w) - value_after = orchpy.pull(objref, w) + objref = halo.push(value_before, w) + value_after = halo.pull(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = i * 10 ** 6 * 1.0 - objref = orchpy.push(value_before, w) - value_after = orchpy.pull(objref, w) + objref = halo.push(value_before, w) + value_after = halo.pull(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = "h" * i - objref = orchpy.push(value_before, w) - value_after = orchpy.pull(objref, w) + objref = halo.push(value_before, w) + value_after = halo.pull(objref, w) self.assertEqual(value_before, value_after) for i in range(100): value_before = [1] * i - objref = orchpy.push(value_before, w) - value_after = orchpy.pull(objref, w) + objref = halo.push(value_before, w) + value_after = halo.pull(objref, w) self.assertEqual(value_before, value_after) services.cleanup() @@ -180,11 +180,11 @@ class APITest(unittest.TestCase): [w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=test_path) objref = w.submit_task("test_functions.test_alias_f", []) - self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(halo.pull(objref[0], w) == np.ones([3, 4, 5]))) objref = w.submit_task("test_functions.test_alias_g", []) - self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(halo.pull(objref[0], w) == np.ones([3, 4, 5]))) objref = w.submit_task("test_functions.test_alias_h", []) - self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) + self.assertTrue(np.alltrue(halo.pull(objref[0], w) == np.ones([3, 4, 5]))) services.cleanup() @@ -193,35 +193,35 @@ class APITest(unittest.TestCase): test_path = os.path.join(test_dir, "testrecv.py") services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path) x = test_functions.keyword_fct1(1) - self.assertEqual(orchpy.pull(x), "1 hello") + self.assertEqual(halo.pull(x), "1 hello") x = test_functions.keyword_fct1(1, "hi") - self.assertEqual(orchpy.pull(x), "1 hi") + self.assertEqual(halo.pull(x), "1 hi") x = test_functions.keyword_fct1(1, b="world") - self.assertEqual(orchpy.pull(x), "1 world") + self.assertEqual(halo.pull(x), "1 world") x = test_functions.keyword_fct2(a="w", b="hi") - self.assertEqual(orchpy.pull(x), "w hi") + self.assertEqual(halo.pull(x), "w hi") x = test_functions.keyword_fct2(b="hi", a="w") - self.assertEqual(orchpy.pull(x), "w hi") + self.assertEqual(halo.pull(x), "w hi") x = test_functions.keyword_fct2(a="w") - self.assertEqual(orchpy.pull(x), "w world") + self.assertEqual(halo.pull(x), "w world") x = test_functions.keyword_fct2(b="hi") - self.assertEqual(orchpy.pull(x), "hello hi") + self.assertEqual(halo.pull(x), "hello hi") x = test_functions.keyword_fct2("w") - self.assertEqual(orchpy.pull(x), "w world") + self.assertEqual(halo.pull(x), "w world") x = test_functions.keyword_fct2("w", "hi") - self.assertEqual(orchpy.pull(x), "w hi") + self.assertEqual(halo.pull(x), "w hi") x = test_functions.keyword_fct3(0, 1, c="w", d="hi") - self.assertEqual(orchpy.pull(x), "0 1 w hi") + self.assertEqual(halo.pull(x), "0 1 w hi") x = test_functions.keyword_fct3(0, 1, d="hi", c="w") - self.assertEqual(orchpy.pull(x), "0 1 w hi") + self.assertEqual(halo.pull(x), "0 1 w hi") x = test_functions.keyword_fct3(0, 1, c="w") - self.assertEqual(orchpy.pull(x), "0 1 w world") + self.assertEqual(halo.pull(x), "0 1 w world") x = test_functions.keyword_fct3(0, 1, d="hi") - self.assertEqual(orchpy.pull(x), "0 1 hello hi") + self.assertEqual(halo.pull(x), "0 1 hello hi") x = test_functions.keyword_fct3(0, 1) - self.assertEqual(orchpy.pull(x), "0 1 hello world") + self.assertEqual(halo.pull(x), "0 1 hello world") services.cleanup() @@ -233,48 +233,48 @@ class ReferenceCountingTest(unittest.TestCase): services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path) x = test_functions.test_alias_f() - orchpy.pull(x) + halo.pull(x) time.sleep(0.1) objref_val = x.val - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val] == 1) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val] == 1) del x - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val] == -1) # -1 indicates deallocated + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val] == -1) # -1 indicates deallocated y = test_functions.test_alias_h() - orchpy.pull(y) + halo.pull(y) time.sleep(0.1) objref_val = y.val - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 0, 0]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 0, 0]) del y - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) z = dist.zeros([dist.BLOCK_SIZE, 2 * dist.BLOCK_SIZE], "float") time.sleep(0.1) objref_val = z.val - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) del z time.sleep(0.1) - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) x = single.zeros([10, 10], "float") y = single.zeros([10, 10], "float") z = single.dot(x, y) objref_val = x.val time.sleep(0.1) - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [1, 1, 1]) del x time.sleep(0.1) - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, 1, 1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, 1, 1]) del y time.sleep(0.1) - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, 1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, 1]) del z time.sleep(0.1) - self.assertTrue(orchpy.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) + self.assertTrue(halo.scheduler_info()["reference_counts"][objref_val:(objref_val + 3)] == [-1, -1, -1]) services.cleanup() diff --git a/test/shell.py b/test/shell.py index 2e420d0bd..c930b7ec5 100644 --- a/test/shell.py +++ b/test/shell.py @@ -1,16 +1,16 @@ import argparse import numpy as np -import orchpy -import orchpy.services as services -import orchpy.worker as worker +import halo +import halo.services as services +import halo.worker as worker import test_functions import arrays.single as single import arrays.dist as dist from grpc.beta import implementations -import orchestra_pb2 +import halo_pb2 import types_pb2 TIMEOUT_SECONDS = 5 diff --git a/test/test_functions.py b/test/test_functions.py index 6af075475..e91a0d0e3 100644 --- a/test/test_functions.py +++ b/test/test_functions.py @@ -1,54 +1,54 @@ -import orchpy +import halo import numpy as np # Test simple functionality -@orchpy.distributed([str], [str]) +@halo.distributed([str], [str]) def print_string(string): print "called print_string with", string f = open("asdfasdf.txt", "w") f.write("successfully called print_string with argument {}.".format(string)) return string -@orchpy.distributed([int, int], [int, int]) +@halo.distributed([int, int], [int, int]) def handle_int(a, b): return a + 1, b + 1 # Test aliasing -@orchpy.distributed([], [np.ndarray]) +@halo.distributed([], [np.ndarray]) def test_alias_f(): return np.ones([3, 4, 5]) -@orchpy.distributed([], [np.ndarray]) +@halo.distributed([], [np.ndarray]) def test_alias_g(): return test_alias_f() -@orchpy.distributed([], [np.ndarray]) +@halo.distributed([], [np.ndarray]) def test_alias_h(): return test_alias_g() # Test timing -@orchpy.distributed([], []) +@halo.distributed([], []) def empty_function(): return () -@orchpy.distributed([], [int]) +@halo.distributed([], [int]) def trivial_function(): return 1 # Test keyword arguments -@orchpy.distributed([int, str], [str]) +@halo.distributed([int, str], [str]) def keyword_fct1(a, b="hello"): return "{} {}".format(a, b) -@orchpy.distributed([str, str], [str]) +@halo.distributed([str, str], [str]) def keyword_fct2(a="hello", b="world"): return "{} {}".format(a, b) -@orchpy.distributed([int, int, str, str], [str]) +@halo.distributed([int, int, str, str], [str]) def keyword_fct3(a, b, c="hello", d="world"): return "{} {} {} {}".format(a, b, c, d) diff --git a/test/testrecv.py b/test/testrecv.py index 70f9099e9..23b54f011 100644 --- a/test/testrecv.py +++ b/test/testrecv.py @@ -6,9 +6,9 @@ import test_functions import arrays.single as single import arrays.dist as dist -import orchpy -import orchpy.services as services -import orchpy.worker as worker +import halo +import halo.services as services +import halo.worker as worker parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.') parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") @@ -19,13 +19,13 @@ if __name__ == '__main__': args = parser.parse_args() worker.connect(args.scheduler_address, args.objstore_address, args.worker_address) - orchpy.register_module(test_functions) - orchpy.register_module(single) - orchpy.register_module(single.random) - orchpy.register_module(single.linalg) - orchpy.register_module(dist) - orchpy.register_module(dist.random) - orchpy.register_module(dist.linalg) - orchpy.register_module(sys.modules[__name__]) + halo.register_module(test_functions) + halo.register_module(single) + halo.register_module(single.random) + halo.register_module(single.linalg) + halo.register_module(dist) + halo.register_module(dist.random) + halo.register_module(dist.linalg) + halo.register_module(sys.modules[__name__]) worker.main_loop()