From 37ac8faae5e6af040ae089d3a0bd31b988055aef Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 19 Apr 2016 14:44:07 -0700 Subject: [PATCH] implement distributed qr (#30) --- lib/orchpy/arrays/dist/__init__.py | 2 +- lib/orchpy/arrays/dist/core.py | 201 ++++++++++++++++----------- lib/orchpy/arrays/dist/linalg.py | 192 +++++++++++++++++++++++++ lib/orchpy/arrays/single/__init__.py | 2 +- lib/orchpy/arrays/single/core.py | 31 +++++ lib/orchpy/arrays/single/linalg.py | 36 ----- lib/orchpy/orchpy/worker.py | 4 +- test/arrays_test.py | 123 +++++++++++++++- test/testrecv.py | 2 +- 9 files changed, 471 insertions(+), 122 deletions(-) create mode 100644 lib/orchpy/arrays/dist/linalg.py diff --git a/lib/orchpy/arrays/dist/__init__.py b/lib/orchpy/arrays/dist/__init__.py index 3200379ef..d61616df6 100644 --- a/lib/orchpy/arrays/dist/__init__.py +++ b/lib/orchpy/arrays/dist/__init__.py @@ -1,2 +1,2 @@ -import random +import random, linalg from core import * diff --git a/lib/orchpy/arrays/dist/core.py b/lib/orchpy/arrays/dist/core.py index 08d379b31..0e22d37e3 100644 --- a/lib/orchpy/arrays/dist/core.py +++ b/lib/orchpy/arrays/dist/core.py @@ -4,16 +4,16 @@ import arrays.single as single import orchpy as op __all__ = ["BLOCK_SIZE", "DistArray", "assemble", "zeros", "ones", "copy", - "eye", "triu", "tril", "blockwise_dot", "dot", "block_column", "block_row"] + "eye", "triu", "tril", "blockwise_dot", "dot", "transpose", "add", "subtract", "eye2", "numpy_to_dist", "subblocks"] BLOCK_SIZE = 10 class DistArray(object): - def construct(self, shape, objrefs): + def construct(self, shape, objrefs=None): self.shape = shape - self.objrefs = objrefs self.ndim = len(shape) self.num_blocks = [int(np.ceil(1.0 * a / BLOCK_SIZE)) for a in self.shape] + self.objrefs = objrefs if objrefs is not None else np.empty(self.num_blocks, dtype=object) if self.num_blocks != list(self.objrefs.shape): raise Exception("The fields `num_blocks` and `objrefs` are inconsistent, `num_blocks` is {} and `objrefs` has shape {}".format(self.num_blocks, list(self.objrefs.shape))) @@ -24,20 +24,20 @@ class DistArray(object): def serialize(self): return (self.shape, self.objrefs) - def __init__(self): - self.shape = None - self.objrefs = None + def __init__(self, shape=None): + if shape is not None: + self.construct(shape) @staticmethod def compute_block_lower(index, shape): - # TODO(rkn): Check that the entries of index are in the correct range. - # TODO(rkn): Check that len(index) == len(shape). + if len(index) != len(shape): + raise Exception("The fields `index` and `shape` must have the same length, but `index` is {} and `shape` is {}.".format(index, shape)) return [elem * BLOCK_SIZE for elem in index] @staticmethod def compute_block_upper(index, shape): - # TODO(rkn): Check that the entries of index are in the correct range. - # TODO(rkn): Check that len(index) == len(shape). + if len(index) != len(shape): + raise Exception("The fields `index` and `shape` must have the same length, but `index` is {} and `shape` is {}.".format(index, shape)) upper = [] for i in range(len(shape)): upper.append(min((index[i] + 1) * BLOCK_SIZE, shape[i])) @@ -73,82 +73,87 @@ class DistArray(object): def assemble(a): return a.assemble() +# TODO(rkn): what should we call this method +@op.distributed([np.ndarray], [DistArray]) +def numpy_to_dist(a): + result = DistArray(a.shape) + for index in np.ndindex(*result.num_blocks): + lower = DistArray.compute_block_lower(index, a.shape) + upper = DistArray.compute_block_upper(index, a.shape) + result.objrefs[index] = op.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]]) + return result + @op.distributed([List[int], str], [DistArray]) def zeros(shape, dtype_name): - num_blocks = DistArray.compute_num_blocks(shape) - objrefs = np.empty(num_blocks, dtype=object) - for index in np.ndindex(*num_blocks): - objrefs[index] = single.zeros(DistArray.compute_block_shape(index, shape), dtype_name) - result = DistArray() - result.construct(shape, objrefs) + result = DistArray(shape) + for index in np.ndindex(*result.num_blocks): + result.objrefs[index] = single.zeros(DistArray.compute_block_shape(index, shape), dtype_name) return result @op.distributed([List[int], str], [DistArray]) def ones(shape, dtype_name): - num_blocks = DistArray.compute_num_blocks(shape) - objrefs = np.empty(num_blocks, dtype=object) - for index in np.ndindex(*num_blocks): - objrefs[index] = single.ones(DistArray.compute_block_shape(index, shape), dtype_name) - result = DistArray() - result.construct(shape, objrefs) + result = DistArray(shape) + for index in np.ndindex(*result.num_blocks): + result.objrefs[index] = single.ones(DistArray.compute_block_shape(index, shape), dtype_name) return result @op.distributed([DistArray], [DistArray]) def copy(a): - num_blocks = DistArray.compute_num_blocks(a.shape) - objrefs = np.empty(num_blocks, dtype=object) - for index in np.ndindex(*num_blocks): - objrefs[index] = single.copy(a.objrefs[index]) - result = DistArray() - result.construct(a.shape, objrefs) + result = DistArray(a.shape) + for index in np.ndindex(*result.num_blocks): + result.objrefs[index] = a.objrefs[index] # We don't need to actually copy the objects because cluster-level objects are assumed to be immutable. return result @op.distributed([int, str], [DistArray]) def eye(dim, dtype_name): shape = [dim, dim] - num_blocks = DistArray.compute_num_blocks(shape) - objrefs = np.empty(num_blocks, dtype=object) - for (i, j) in np.ndindex(*num_blocks): + result = DistArray(shape) + for (i, j) in np.ndindex(*result.num_blocks): if i == j: - objrefs[i, j] = single.eye(DistArray.compute_block_shape([i, j], shape)[0], dtype_name) + result.objrefs[i, j] = single.eye(DistArray.compute_block_shape([i, j], shape)[0], dtype_name) else: - objrefs[i, j] = single.zeros(DistArray.compute_block_shape([i, j], shape), dtype_name) - result = DistArray() - result.construct(shape, objrefs) + result.objrefs[i, j] = single.zeros(DistArray.compute_block_shape([i, j], shape), dtype_name) + return result + +# TODO(rkn): Support optional arguments so that we can make this part of eye. +@op.distributed([int, int, str], [DistArray]) +def eye2(dim1, dim2, dtype_name): + shape = [dim1, dim2] + result = DistArray(shape) + for (i, j) in np.ndindex(*result.num_blocks): + block_shape = DistArray.compute_block_shape([i, j], shape) + if i == j: + result.objrefs[i, j] = single.eye2(block_shape[0], block_shape[1], dtype_name) + else: + result.objrefs[i, j] = single.zeros(block_shape, dtype_name) return result @op.distributed([DistArray], [DistArray]) def triu(a): if a.ndim != 2: raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim)) - objrefs = np.empty(a.num_blocks, dtype=object) - for i in range(a.num_blocks[0]): - for j in range(a.num_blocks[1]): - if i < j: - objrefs[i, j] = single.copy(a.objrefs[i, j]) - elif i == j: - objrefs[i, j] = single.triu(a.objrefs[i, j]) - else: - objrefs[i, j] = single.zeros_like(a.objrefs[i, j]) - result = DistArray() - result.construct(a.shape, objrefs) + result = DistArray(a.shape) + for (i, j) in np.ndindex(*result.num_blocks): + if i < j: + result.objrefs[i, j] = single.copy(a.objrefs[i, j]) + elif i == j: + result.objrefs[i, j] = single.triu(a.objrefs[i, j]) + else: + result.objrefs[i, j] = single.zeros_like(a.objrefs[i, j]) return result @op.distributed([DistArray], [DistArray]) def tril(a): if a.ndim != 2: raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim)) - objrefs = np.empty(a.num_blocks, dtype=object) - for i in range(a.num_blocks[0]): - for j in range(a.num_blocks[1]): - if i > j: - objrefs[i, j] = single.copy(a.objrefs[i, j]) - elif i == j: - objrefs[i, j] = single.tril(a.objrefs[i, j]) - else: - objrefs[i, j] = single.zeros_like(a.objrefs[i, j]) - result = DistArray() - result.construct(a.shape, objrefs) + result = DistArray(a.shape) + for (i, j) in np.ndindex(*result.num_blocks): + if i > j: + result.objrefs[i, j] = single.copy(a.objrefs[i, j]) + elif i == j: + result.objrefs[i, j] = single.tril(a.objrefs[i, j]) + else: + result.objrefs[i, j] = single.zeros_like(a.objrefs[i, j]) return result @op.distributed([np.ndarray, None], [np.ndarray]) @@ -171,34 +176,70 @@ def dot(a, b): if a.shape[1] != b.shape[0]: raise Exception("dot expects a.shape[1] to equal b.shape[0], but a.shape = {} and b.shape = {}.".format(a.shape, b.shape)) shape = [a.shape[0], b.shape[1]] - num_blocks = DistArray.compute_num_blocks(shape) - objrefs = np.empty(num_blocks, dtype=object) - for i in range(num_blocks[0]): - for j in range(num_blocks[1]): - args = list(a.objrefs[i, :]) + list(b.objrefs[:, j]) - objrefs[i, j] = blockwise_dot(*args) - result = DistArray() - result.construct(shape, objrefs) + result = DistArray(shape) + for (i, j) in np.ndindex(*result.num_blocks): + args = list(a.objrefs[i, :]) + list(b.objrefs[:, j]) + result.objrefs[i, j] = blockwise_dot(*args) return result # This is not in numpy, should we expose this? -@op.distributed([DistArray], [DistArray]) -def block_column(a, col): - if a.ndim != 2: - raise Exception("block_column expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape)) - top_block_shape = DistArray.compute_block_shape([0, col]) - shape = [a.shape[0], top_block_shape[1]] - result = DistArray() - result.construct(shape, a.objrefs[:, col]) +@op.distributed([DistArray, List[int], None], [DistArray]) +def subblocks(a, *ranges): + """ + This function produces a distributed array from a subset of the blocks in the `a`. The result and `a` will have the same number of dimensions.For example, + subblocks(a, [0, 1], [2, 4]) + will produce a DistArray whose objrefs are + [[a.objrefs[0, 2], a.objrefs[0, 4]], + [a.objrefs[1, 2], a.objrefs[1, 4]]] + We allow the user to pass in an empty list [] to indicate the full range. + """ + ranges = list(ranges) + if len(ranges) != a.ndim: + raise Exception("sub_blocks expects to receive a number of ranges equal to a.ndim, but it received {} ranges and a.ndim = {}.".format(len(ranges), a.ndim)) + for i in range(len(ranges)): + if ranges[i] == []: # We allow the user to pass in an empty list to indicate the full range + ranges[i] = range(a.num_blocks[i]) + if not np.alltrue(ranges[i] == np.sort(ranges[i])): + raise Exception("Ranges passed to sub_blocks must be sorted, but the {}th range is {}.".format(i, ranges[i])) + if ranges[i][0] < 0: + raise Exception("Values in the ranges passed to sub_blocks must be at least 0, but the {}th range is {}.".format(i, ranges[i])) + if ranges[i][-1] >= a.num_blocks[i]: + raise Exception("Values in the ranges passed to sub_blocks must be less than the relevant number of blocks, but the {}th range is {}, and a.num_blocks = {}.".format(i, ranges[i], a.num_blocks)) + last_index = [r[-1] for r in ranges] + last_block_shape = DistArray.compute_block_shape(last_index, a.shape) + shape = [(len(ranges[i]) - 1) * BLOCK_SIZE + last_block_shape[i] for i in range(a.ndim)] + result = DistArray(shape) + for index in np.ndindex(*result.num_blocks): + print tuple([ranges[i][index[i]] for i in range(a.ndim)]) + result.objrefs[index] = a.objrefs[tuple([ranges[i][index[i]] for i in range(a.ndim)])] return result -# This is not in numpy, should we expose this? @op.distributed([DistArray], [DistArray]) -def block_row(a, row): +def transpose(a): if a.ndim != 2: - raise Exception("block_row expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape)) - left_block_shape = DistArray.compute_block_shape([row, 0]) - shape = [left_block_shape[0], a.shape[1]] - result = DistArray() - result.construct(shape, a.objrefs[row, :]) + raise Exception("transpose expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape)) + result = DistArray([a.shape[1], a.shape[0]]) + for i in range(result.num_blocks[0]): + for j in range(result.num_blocks[1]): + result.objrefs[i, j] = single.transpose(a.objrefs[j, i]) + return result + +# TODO(rkn): support broadcasting? +@op.distributed([DistArray, DistArray], [DistArray]) +def add(x1, x2): + if x1.shape != x2.shape: + raise Exception("add expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape)) + result = DistArray(x1.shape) + for index in np.ndindex(*result.num_blocks): + result.objrefs[index] = single.add(x1.objrefs[index], x2.objrefs[index]) + return result + +# TODO(rkn): support broadcasting? +@op.distributed([DistArray, DistArray], [DistArray]) +def subtract(x1, x2): + if x1.shape != x2.shape: + raise Exception("subtract expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape)) + result = DistArray(x1.shape) + for index in np.ndindex(*result.num_blocks): + result.objrefs[index] = single.subtract(x1.objrefs[index], x2.objrefs[index]) return result diff --git a/lib/orchpy/arrays/dist/linalg.py b/lib/orchpy/arrays/dist/linalg.py new file mode 100644 index 000000000..1d9eba246 --- /dev/null +++ b/lib/orchpy/arrays/dist/linalg.py @@ -0,0 +1,192 @@ +from typing import List + +import numpy as np +import arrays.single as single +import orchpy as op + +from core import * + +__all__ = ["tsqr", "modified_lu", "tsqr_hr", "qr"] + +@op.distributed([DistArray], [DistArray, np.ndarray]) +def tsqr(a): + """ + arguments: + a: a distributed matrix + Suppose that + a.shape == (M, N) + K == min(M, N) + return values: + q: DistArray, if q_full = op.context.pull(DistArray, q).assemble(), then + q_full.shape == (M, K) + np.allclose(np.dot(q_full.T, q_full), np.eye(K)) == True + r: np.ndarray, if r_val = op.context.pull(np.ndarray, r), then + r_val.shape == (K, N) + np.allclose(r, np.triu(r)) == True + """ + if len(a.shape) != 2: + raise Exception("tsqr requires len(a.shape) == 2, but a.shape is {}".format(a.shape)) + if a.num_blocks[1] != 1: + raise Exception("tsqr requires a.num_blocks[1] == 1, but a.num_blocks is {}".format(a.num_blocks)) + + num_blocks = a.num_blocks[0] + K = int(np.ceil(np.log2(num_blocks))) + 1 + q_tree = np.empty((num_blocks, K), dtype=object) + current_rs = [] + for i in range(num_blocks): + block = a.objrefs[i, 0] + q, r = single.linalg.qr(block) + q_tree[i, 0] = q + current_rs.append(r) + for j in range(1, K): + new_rs = [] + for i in range(int(np.ceil(1.0 * len(current_rs) / 2))): + stacked_rs = single.vstack(*current_rs[(2 * i):(2 * i + 2)]) + q, r = single.linalg.qr(stacked_rs) + q_tree[i, j] = q + new_rs.append(r) + current_rs = new_rs + assert len(current_rs) == 1, "len(current_rs) = " + str(len(current_rs)) + + q_result = DistArray() + + # handle the special case in which the whole DistArray "a" fits in one block + # and has fewer rows than columns, this is a bit ugly so think about how to + # remove it + if a.shape[0] >= a.shape[1]: + q_shape = a.shape + else: + q_shape = [a.shape[0], a.shape[0]] + q_num_blocks = DistArray.compute_num_blocks(q_shape) + q_result = DistArray() + q_objrefs = np.empty(q_num_blocks, dtype=object) + q_result.construct(q_shape, q_objrefs) + + # reconstruct output + for i in range(num_blocks): + q_block_current = q_tree[i, 0] + ith_index = i + for j in range(1, K): + if np.mod(ith_index, 2) == 0: + lower = [0, 0] + upper = [a.shape[1], BLOCK_SIZE] + else: + lower = [a.shape[1], 0] + upper = [2 * a.shape[1], BLOCK_SIZE] + ith_index /= 2 + q_block_current = single.dot(q_block_current, single.subarray(q_tree[ith_index, j], lower, upper)) + q_result.objrefs[i] = q_block_current + r = current_rs[0] + return q_result, r + +# TODO(rkn): This is unoptimized, we really want a block version of this. +@op.distributed([DistArray], [DistArray, np.ndarray, np.ndarray]) +def modified_lu(q): + """ + Algorithm 5 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf + takes a matrix q with orthonormal columns, returns l, u, s such that q - s = l * u + arguments: + q: a two dimensional orthonormal q + return values: + l: lower triangular + u: upper triangular + s: a diagonal matrix represented by its diagonal + """ + q = q.assemble() + m, b = q.shape[0], q.shape[1] + S = np.zeros(b) + + q_work = np.copy(q) + + for i in range(b): + S[i] = -1 * np.sign(q_work[i, i]) + q_work[i, i] -= S[i] + q_work[(i + 1):m, i] /= q_work[i, i] # scale ith column of L by diagonal element + q_work[(i + 1):m, (i + 1):b] -= np.outer(q_work[(i + 1):m, i], q_work[i, (i + 1):b]) # perform Schur complement update + + L = np.tril(q_work) + for i in range(b): + L[i, i] = 1 + U = np.triu(q_work)[:b, :] + return numpy_to_dist(op.push(L)), U, S # TODO(rkn): get rid of push and pull + +@op.distributed([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray]) +def tsqr_hr_helper1(u, s, y_top_block, b): + y_top = y_top_block[:b, :b] + s_full = np.diag(s) + t = -1 * np.dot(u, np.dot(s_full, np.linalg.inv(y_top).T)) + return t, y_top + +@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +def tsqr_hr_helper2(s, r_temp): + s_full = np.diag(s) + return np.dot(s_full, r_temp) + +@op.distributed([DistArray], [DistArray, np.ndarray, np.ndarray, np.ndarray]) +def tsqr_hr(a): + """Algorithm 6 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" + q, r_temp = tsqr(a) + y, u, s = modified_lu(q) + y_blocked = op.pull(y) + t, y_top = tsqr_hr_helper1(u, s, y_blocked.objrefs[0, 0], a.shape[1]) + r = tsqr_hr_helper2(s, r_temp) + return y, t, y_top, r + +@op.distributed([np.ndarray, np.ndarray, np.ndarray, np.ndarray], [np.ndarray]) +def qr_helper1(a_rc, y_ri, t, W_c): + return a_rc - np.dot(y_ri, np.dot(t.T, W_c)) + +@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +def qr_helper2(y_ri, a_rc): + return np.dot(y_ri.T, a_rc) + +@op.distributed([DistArray], [DistArray, DistArray]) +def qr(a): + """Algorithm 7 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf""" + m, n = a.shape[0], a.shape[1] + k = min(m, n) + + # we will store our scratch work in a_work + a_work = DistArray() + a_work.construct(a.shape, np.copy(a.objrefs)) + + result_dtype = np.linalg.qr(op.pull(a.objrefs[0, 0]))[0].dtype.name + r_res = op.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + y_res = op.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it. + Ts = [] + + for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0] + sub_dist_array = subblocks(a_work, range(i, a_work.num_blocks[0]), [i]) + y, t, _, R = tsqr_hr(sub_dist_array) + y_val = op.pull(y) + + for j in range(i, a.num_blocks[0]): + y_res.objrefs[j, i] = y_val.objrefs[j - i, 0] + if a.shape[0] > a.shape[1]: + # in this case, R needs to be square + R_shape = op.pull(single.shape(R)) + eye_temp = single.eye2(R_shape[1], R_shape[0], result_dtype) + r_res.objrefs[i, i] = single.dot(eye_temp, R) + else: + r_res.objrefs[i, i] = R + Ts.append(numpy_to_dist(t)) + + for c in range(i + 1, a.num_blocks[1]): + W_rcs = [] + for r in range(i, a.num_blocks[0]): + y_ri = y_val.objrefs[r - i, 0] + W_rcs.append(qr_helper2(y_ri, a_work.objrefs[r, c])) + W_c = single.sum(0, *W_rcs) + for r in range(i, a.num_blocks[0]): + y_ri = y_val.objrefs[r - i, 0] + A_rc = qr_helper1(a_work.objrefs[r, c], y_ri, t, W_c) + a_work.objrefs[r, c] = A_rc + r_res.objrefs[i, c] = a_work.objrefs[i, c] + + # construct q_res from Ys and Ts + q = eye2(m, k, result_dtype) + for i in range(len(Ts))[::-1]: + y_col_block = subblocks(y_res, [], [i]) + q = subtract(q, dot(y_col_block, dot(Ts[i], dot(transpose(y_col_block), q)))) + + return q, r_res diff --git a/lib/orchpy/arrays/single/__init__.py b/lib/orchpy/arrays/single/__init__.py index 967afbb62..d61616df6 100644 --- a/lib/orchpy/arrays/single/__init__.py +++ b/lib/orchpy/arrays/single/__init__.py @@ -1,2 +1,2 @@ import random, linalg -from core import zeros, zeros_like, ones, eye, dot, vstack, hstack, subarray, copy, tril, triu +from core import * diff --git a/lib/orchpy/arrays/single/core.py b/lib/orchpy/arrays/single/core.py index e634dd8fb..05ca1a545 100644 --- a/lib/orchpy/arrays/single/core.py +++ b/lib/orchpy/arrays/single/core.py @@ -2,6 +2,8 @@ from typing import List import numpy as np import orchpy as op +__all__ = ["zeros", "zeros_like", "ones", "eye", "dot", "vstack", "hstack", "subarray", "copy", "tril", "triu", "diag", "transpose", "add", "subtract", "eye2", "sum", "shape"] + @op.distributed([List[int], str], [np.ndarray]) def zeros(shape, dtype_name): return np.zeros(shape, dtype=np.dtype(dtype_name)) @@ -18,6 +20,11 @@ def ones(shape, dtype_name): def eye(dim, dtype_name): return np.eye(dim, dtype=np.dtype(dtype_name)) +# TODO(rkn): This should be part of eye +@op.distributed([int, int, str], [np.ndarray]) +def eye2(dim1, dim2, dtype_name): + return np.eye(dim1, dim2, dtype=np.dtype(dtype_name)) + @op.distributed([np.ndarray, np.ndarray], [np.ndarray]) def dot(a, b): return np.dot(a, b) @@ -49,3 +56,27 @@ def tril(a): @op.distributed([np.ndarray], [np.ndarray]) def triu(a): return np.triu(a) + +@op.distributed([np.ndarray], [np.ndarray]) +def diag(a): + return np.diag(a) + +@op.distributed([np.ndarray], [np.ndarray]) +def transpose(a): + return np.transpose(a) + +@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +def add(x1, x2): + return np.add(x1, x2) + +@op.distributed([np.ndarray, np.ndarray], [np.ndarray]) +def subtract(x1, x2): + return np.subtract(x1, x2) + +@op.distributed([int, np.ndarray, None], [np.ndarray]) +def sum(axis, *xs): + return np.sum(xs, axis=axis) + +@op.distributed([np.ndarray], [tuple]) +def shape(a): + return np.shape(a) diff --git a/lib/orchpy/arrays/single/linalg.py b/lib/orchpy/arrays/single/linalg.py index 1c969247d..5fbd7b4a1 100644 --- a/lib/orchpy/arrays/single/linalg.py +++ b/lib/orchpy/arrays/single/linalg.py @@ -86,39 +86,3 @@ def matrix_rank(M): @op.distributed([np.ndarray, None], [np.ndarray]) def multi_dot(a): raise NotImplementedError - - - -# This isn't in numpy, should we expose it? -@op.distributed([np.ndarray], [np.ndarray, np.ndarray, np.ndarray]) -def modified_lu(q): - """ - Algorithm 5 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf - takes a matrix q with orthonormal columns, returns l, u, s such that q - s = l * u - arguments: - q: a two dimensional orthonormal q - return values: - l: lower triangular - u: upper triangular - s: a diagonal matrix represented by its diagonal - """ - m, b = q.shape[0], q.shape[1] - S = np.zeros(b) - - q_work = np.copy(q) - - for i in range(b): - S[i] = -1 * np.sign(q_work[i, i]) - q_work[i, i] -= S[i] - - # scale ith column of L by diagonal element - q_work[(i + 1):m, i] /= q_work[i, i] - - # perform Schur complement update - q_work[(i + 1):m, (i + 1):b] -= np.outer(q_work[(i + 1):m, i], q_work[i, (i + 1):b]) - - L = np.tril(q_work) - for i in range(b): - L[i, i] = 1 - U = np.triu(q_work)[:b, :] - return L, U, S diff --git a/lib/orchpy/orchpy/worker.py b/lib/orchpy/orchpy/worker.py index 712cf1325..79d0be25d 100644 --- a/lib/orchpy/orchpy/worker.py +++ b/lib/orchpy/orchpy/worker.py @@ -99,10 +99,10 @@ def distributed(arg_types, return_types, worker=global_worker): def distributed_decorator(func): def func_executor(arguments): """This is what gets executed remotely on a worker after a distributed function is scheduled by the scheduler.""" - print "Calling function {} with arguments {}".format(func.__name__, arguments) + print "Calling function {}".format(func.__name__) result = func(*arguments) check_return_values(func_call, result) # throws an exception if result is invalid - print "Finished executing function {} with arguments {}".format(func.__name__, arguments) + print "Finished executing function {}".format(func.__name__) return result def func_call(*args): """This is what gets run immediately when a worker calls a distributed function.""" diff --git a/test/arrays_test.py b/test/arrays_test.py index 71dc725fe..dd442be3d 100644 --- a/test/arrays_test.py +++ b/test/arrays_test.py @@ -82,7 +82,7 @@ class ArraysDistTest(unittest.TestCase): def testMethods(self): test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - services.start_cluster(num_workers=4, worker_path=test_path) + services.start_cluster(num_workers=8, worker_path=test_path) x = dist.zeros([9, 25, 51], "float") y = dist.assemble(x) @@ -123,6 +123,127 @@ class ArraysDistTest(unittest.TestCase): np.allclose(orchpy.pull(w), np.dot(orchpy.pull(u), orchpy.pull(v))) self.assertTrue(np.allclose(orchpy.pull(w), np.dot(orchpy.pull(u), orchpy.pull(v)))) + # test add + x = dist.random.normal([23, 42]) + y = dist.random.normal([23, 42]) + z = dist.add(x, y) + z_full = dist.assemble(z) + x_full = dist.assemble(x) + y_full = dist.assemble(y) + self.assertTrue(np.allclose(orchpy.pull(z_full), orchpy.pull(x_full) + orchpy.pull(y_full))) + + # test subtract + x = dist.random.normal([33, 40]) + y = dist.random.normal([33, 40]) + z = dist.subtract(x, y) + z_full = dist.assemble(z) + x_full = dist.assemble(x) + y_full = dist.assemble(y) + self.assertTrue(np.allclose(orchpy.pull(z_full), orchpy.pull(x_full) - orchpy.pull(y_full))) + + # test transpose + x = dist.random.normal([234, 432]) + y = dist.transpose(x) + x_full = dist.assemble(x) + y_full = dist.assemble(y) + self.assertTrue(np.alltrue(orchpy.pull(x_full).T == orchpy.pull(y_full))) + + # test numpy_to_dist + x = dist.random.normal([23, 45]) + y = dist.assemble(x) + z = dist.numpy_to_dist(y) + w = dist.assemble(z) + x_full = dist.assemble(x) + z_full = dist.assemble(z) + self.assertTrue(np.alltrue(orchpy.pull(x_full) == orchpy.pull(z_full))) + self.assertTrue(np.alltrue(orchpy.pull(y) == orchpy.pull(w))) + + # test dist.tsqr + for shape in [[123, dist.BLOCK_SIZE], [7, dist.BLOCK_SIZE], [dist.BLOCK_SIZE, dist.BLOCK_SIZE], [dist.BLOCK_SIZE, 7], [10 * dist.BLOCK_SIZE, dist.BLOCK_SIZE]]: + x = dist.random.normal(shape) + K = min(shape) + q, r = dist.linalg.tsqr(x) + x_full = dist.assemble(x) + x_val = orchpy.pull(x_full) + q_full = dist.assemble(q) + q_val = orchpy.pull(q_full) + r_val = orchpy.pull(r) + self.assertTrue(r_val.shape == (K, shape[1])) + self.assertTrue(np.alltrue(r_val == np.triu(r_val))) + self.assertTrue(np.allclose(x_val, np.dot(q_val, r_val))) + self.assertTrue(np.allclose(np.dot(q_val.T, q_val), np.eye(K))) + + # test dist.linalg.modified_lu + def test_modified_lu(d1, d2): + print "testing dist_modified_lu with d1 = " + str(d1) + ", d2 = " + str(d2) + assert d1 >= d2 + k = min(d1, d2) + m = single.random.normal([d1, d2]) + q, r = single.linalg.qr(m) + l, u, s = dist.linalg.modified_lu(dist.numpy_to_dist(q)) + q_val = orchpy.pull(q) + r_val = orchpy.pull(r) + l_full = dist.assemble(l) + l_val = orchpy.pull(l_full) + u_val = orchpy.pull(u) + s_val = orchpy.pull(s) + s_mat = np.zeros((d1, d2)) + for i in range(len(s_val)): + s_mat[i, i] = s_val[i] + self.assertTrue(np.allclose(q_val - s_mat, np.dot(l_val, u_val))) # check that q - s = l * u + self.assertTrue(np.alltrue(np.triu(u_val) == u_val)) # check that u is upper triangular + self.assertTrue(np.alltrue(np.tril(l_val) == l_val)) # check that l is lower triangular + + for d1, d2 in [(100, 100), (99, 98), (7, 5), (7, 7), (20, 7), (20, 10)]: + test_modified_lu(d1, d2) + + # test dist_tsqr_hr + def test_dist_tsqr_hr(d1, d2): + print "testing dist_tsqr_hr with d1 = " + str(d1) + ", d2 = " + str(d2) + a = dist.random.normal([d1, d2]) + y, t, y_top, r = dist.linalg.tsqr_hr(a) + a_full = dist.assemble(a) + a_val = orchpy.pull(a_full) + y_full = dist.assemble(y) + y_val = orchpy.pull(y_full) + t_val = orchpy.pull(t) + y_top_val = orchpy.pull(y_top) + r_val = orchpy.pull(r) + tall_eye = np.zeros((d1, min(d1, d2))) + np.fill_diagonal(tall_eye, 1) + q = tall_eye - np.dot(y_val, np.dot(t_val, y_top_val.T)) + self.assertTrue(np.allclose(np.dot(q.T, q), np.eye(min(d1, d2)))) # check that q.T * q = I + self.assertTrue(np.allclose(np.dot(q, r_val), a_val)) # check that a = (I - y * t * y_top.T) * r + + for d1, d2 in [(123, dist.BLOCK_SIZE), (7, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, 7), (10 * dist.BLOCK_SIZE, dist.BLOCK_SIZE)]: + test_dist_tsqr_hr(d1, d2) + + def test_dist_qr(d1, d2): + print "testing qr with d1 = {}, and d2 = {}.".format(d1, d2) + a = dist.random.normal([d1, d2]) + K = min(d1, d2) + q, r = dist.linalg.qr(a) + a_full = dist.assemble(a) + q_full = dist.assemble(q) + r_full = dist.assemble(r) + a_val = orchpy.pull(a_full) + q_val = orchpy.pull(q_full) + r_val = orchpy.pull(r_full) + + self.assertTrue(q_val.shape == (d1, K)) + self.assertTrue(r_val.shape == (K, d2)) + self.assertTrue(np.allclose(np.dot(q_val.T, q_val), np.eye(K))) + self.assertTrue(np.alltrue(r_val == np.triu(r_val))) + self.assertTrue(np.allclose(a_val, np.dot(q_val, r_val))) + + for d1, d2 in [(123, dist.BLOCK_SIZE), (7, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, 7), (13, 21), (34, 35), (8, 7)]: + test_dist_qr(d1, d2) + test_dist_qr(d2, d1) + for _ in range(20): + d1 = np.random.randint(1, 35) + d2 = np.random.randint(1, 35) + test_dist_qr(d1, d2) + services.cleanup() if __name__ == '__main__': diff --git a/test/testrecv.py b/test/testrecv.py index 332572d3a..70f9099e9 100644 --- a/test/testrecv.py +++ b/test/testrecv.py @@ -25,7 +25,7 @@ if __name__ == '__main__': orchpy.register_module(single.linalg) orchpy.register_module(dist) orchpy.register_module(dist.random) - # orchpy.register_module(dist.linalg) + orchpy.register_module(dist.linalg) orchpy.register_module(sys.modules[__name__]) worker.main_loop()