mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:06:31 +08:00
implement distributed qr (#30)
This commit is contained in:
committed by
Philipp Moritz
parent
bffae5a80e
commit
37ac8faae5
Vendored
+1
-1
@@ -1,2 +1,2 @@
|
||||
import random
|
||||
import random, linalg
|
||||
from core import *
|
||||
|
||||
Vendored
+121
-80
@@ -4,16 +4,16 @@ import arrays.single as single
|
||||
import orchpy as op
|
||||
|
||||
__all__ = ["BLOCK_SIZE", "DistArray", "assemble", "zeros", "ones", "copy",
|
||||
"eye", "triu", "tril", "blockwise_dot", "dot", "block_column", "block_row"]
|
||||
"eye", "triu", "tril", "blockwise_dot", "dot", "transpose", "add", "subtract", "eye2", "numpy_to_dist", "subblocks"]
|
||||
|
||||
BLOCK_SIZE = 10
|
||||
|
||||
class DistArray(object):
|
||||
def construct(self, shape, objrefs):
|
||||
def construct(self, shape, objrefs=None):
|
||||
self.shape = shape
|
||||
self.objrefs = objrefs
|
||||
self.ndim = len(shape)
|
||||
self.num_blocks = [int(np.ceil(1.0 * a / BLOCK_SIZE)) for a in self.shape]
|
||||
self.objrefs = objrefs if objrefs is not None else np.empty(self.num_blocks, dtype=object)
|
||||
if self.num_blocks != list(self.objrefs.shape):
|
||||
raise Exception("The fields `num_blocks` and `objrefs` are inconsistent, `num_blocks` is {} and `objrefs` has shape {}".format(self.num_blocks, list(self.objrefs.shape)))
|
||||
|
||||
@@ -24,20 +24,20 @@ class DistArray(object):
|
||||
def serialize(self):
|
||||
return (self.shape, self.objrefs)
|
||||
|
||||
def __init__(self):
|
||||
self.shape = None
|
||||
self.objrefs = None
|
||||
def __init__(self, shape=None):
|
||||
if shape is not None:
|
||||
self.construct(shape)
|
||||
|
||||
@staticmethod
|
||||
def compute_block_lower(index, shape):
|
||||
# TODO(rkn): Check that the entries of index are in the correct range.
|
||||
# TODO(rkn): Check that len(index) == len(shape).
|
||||
if len(index) != len(shape):
|
||||
raise Exception("The fields `index` and `shape` must have the same length, but `index` is {} and `shape` is {}.".format(index, shape))
|
||||
return [elem * BLOCK_SIZE for elem in index]
|
||||
|
||||
@staticmethod
|
||||
def compute_block_upper(index, shape):
|
||||
# TODO(rkn): Check that the entries of index are in the correct range.
|
||||
# TODO(rkn): Check that len(index) == len(shape).
|
||||
if len(index) != len(shape):
|
||||
raise Exception("The fields `index` and `shape` must have the same length, but `index` is {} and `shape` is {}.".format(index, shape))
|
||||
upper = []
|
||||
for i in range(len(shape)):
|
||||
upper.append(min((index[i] + 1) * BLOCK_SIZE, shape[i]))
|
||||
@@ -73,82 +73,87 @@ class DistArray(object):
|
||||
def assemble(a):
|
||||
return a.assemble()
|
||||
|
||||
# TODO(rkn): what should we call this method
|
||||
@op.distributed([np.ndarray], [DistArray])
|
||||
def numpy_to_dist(a):
|
||||
result = DistArray(a.shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
lower = DistArray.compute_block_lower(index, a.shape)
|
||||
upper = DistArray.compute_block_upper(index, a.shape)
|
||||
result.objrefs[index] = op.push(a[[slice(l, u) for (l, u) in zip(lower, upper)]])
|
||||
return result
|
||||
|
||||
@op.distributed([List[int], str], [DistArray])
|
||||
def zeros(shape, dtype_name):
|
||||
num_blocks = DistArray.compute_num_blocks(shape)
|
||||
objrefs = np.empty(num_blocks, dtype=object)
|
||||
for index in np.ndindex(*num_blocks):
|
||||
objrefs[index] = single.zeros(DistArray.compute_block_shape(index, shape), dtype_name)
|
||||
result = DistArray()
|
||||
result.construct(shape, objrefs)
|
||||
result = DistArray(shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
result.objrefs[index] = single.zeros(DistArray.compute_block_shape(index, shape), dtype_name)
|
||||
return result
|
||||
|
||||
@op.distributed([List[int], str], [DistArray])
|
||||
def ones(shape, dtype_name):
|
||||
num_blocks = DistArray.compute_num_blocks(shape)
|
||||
objrefs = np.empty(num_blocks, dtype=object)
|
||||
for index in np.ndindex(*num_blocks):
|
||||
objrefs[index] = single.ones(DistArray.compute_block_shape(index, shape), dtype_name)
|
||||
result = DistArray()
|
||||
result.construct(shape, objrefs)
|
||||
result = DistArray(shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
result.objrefs[index] = single.ones(DistArray.compute_block_shape(index, shape), dtype_name)
|
||||
return result
|
||||
|
||||
@op.distributed([DistArray], [DistArray])
|
||||
def copy(a):
|
||||
num_blocks = DistArray.compute_num_blocks(a.shape)
|
||||
objrefs = np.empty(num_blocks, dtype=object)
|
||||
for index in np.ndindex(*num_blocks):
|
||||
objrefs[index] = single.copy(a.objrefs[index])
|
||||
result = DistArray()
|
||||
result.construct(a.shape, objrefs)
|
||||
result = DistArray(a.shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
result.objrefs[index] = a.objrefs[index] # We don't need to actually copy the objects because cluster-level objects are assumed to be immutable.
|
||||
return result
|
||||
|
||||
@op.distributed([int, str], [DistArray])
|
||||
def eye(dim, dtype_name):
|
||||
shape = [dim, dim]
|
||||
num_blocks = DistArray.compute_num_blocks(shape)
|
||||
objrefs = np.empty(num_blocks, dtype=object)
|
||||
for (i, j) in np.ndindex(*num_blocks):
|
||||
result = DistArray(shape)
|
||||
for (i, j) in np.ndindex(*result.num_blocks):
|
||||
if i == j:
|
||||
objrefs[i, j] = single.eye(DistArray.compute_block_shape([i, j], shape)[0], dtype_name)
|
||||
result.objrefs[i, j] = single.eye(DistArray.compute_block_shape([i, j], shape)[0], dtype_name)
|
||||
else:
|
||||
objrefs[i, j] = single.zeros(DistArray.compute_block_shape([i, j], shape), dtype_name)
|
||||
result = DistArray()
|
||||
result.construct(shape, objrefs)
|
||||
result.objrefs[i, j] = single.zeros(DistArray.compute_block_shape([i, j], shape), dtype_name)
|
||||
return result
|
||||
|
||||
# TODO(rkn): Support optional arguments so that we can make this part of eye.
|
||||
@op.distributed([int, int, str], [DistArray])
|
||||
def eye2(dim1, dim2, dtype_name):
|
||||
shape = [dim1, dim2]
|
||||
result = DistArray(shape)
|
||||
for (i, j) in np.ndindex(*result.num_blocks):
|
||||
block_shape = DistArray.compute_block_shape([i, j], shape)
|
||||
if i == j:
|
||||
result.objrefs[i, j] = single.eye2(block_shape[0], block_shape[1], dtype_name)
|
||||
else:
|
||||
result.objrefs[i, j] = single.zeros(block_shape, dtype_name)
|
||||
return result
|
||||
|
||||
@op.distributed([DistArray], [DistArray])
|
||||
def triu(a):
|
||||
if a.ndim != 2:
|
||||
raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim))
|
||||
objrefs = np.empty(a.num_blocks, dtype=object)
|
||||
for i in range(a.num_blocks[0]):
|
||||
for j in range(a.num_blocks[1]):
|
||||
if i < j:
|
||||
objrefs[i, j] = single.copy(a.objrefs[i, j])
|
||||
elif i == j:
|
||||
objrefs[i, j] = single.triu(a.objrefs[i, j])
|
||||
else:
|
||||
objrefs[i, j] = single.zeros_like(a.objrefs[i, j])
|
||||
result = DistArray()
|
||||
result.construct(a.shape, objrefs)
|
||||
result = DistArray(a.shape)
|
||||
for (i, j) in np.ndindex(*result.num_blocks):
|
||||
if i < j:
|
||||
result.objrefs[i, j] = single.copy(a.objrefs[i, j])
|
||||
elif i == j:
|
||||
result.objrefs[i, j] = single.triu(a.objrefs[i, j])
|
||||
else:
|
||||
result.objrefs[i, j] = single.zeros_like(a.objrefs[i, j])
|
||||
return result
|
||||
|
||||
@op.distributed([DistArray], [DistArray])
|
||||
def tril(a):
|
||||
if a.ndim != 2:
|
||||
raise Exception("Input must have 2 dimensions, but a.ndim is " + str(a.ndim))
|
||||
objrefs = np.empty(a.num_blocks, dtype=object)
|
||||
for i in range(a.num_blocks[0]):
|
||||
for j in range(a.num_blocks[1]):
|
||||
if i > j:
|
||||
objrefs[i, j] = single.copy(a.objrefs[i, j])
|
||||
elif i == j:
|
||||
objrefs[i, j] = single.tril(a.objrefs[i, j])
|
||||
else:
|
||||
objrefs[i, j] = single.zeros_like(a.objrefs[i, j])
|
||||
result = DistArray()
|
||||
result.construct(a.shape, objrefs)
|
||||
result = DistArray(a.shape)
|
||||
for (i, j) in np.ndindex(*result.num_blocks):
|
||||
if i > j:
|
||||
result.objrefs[i, j] = single.copy(a.objrefs[i, j])
|
||||
elif i == j:
|
||||
result.objrefs[i, j] = single.tril(a.objrefs[i, j])
|
||||
else:
|
||||
result.objrefs[i, j] = single.zeros_like(a.objrefs[i, j])
|
||||
return result
|
||||
|
||||
@op.distributed([np.ndarray, None], [np.ndarray])
|
||||
@@ -171,34 +176,70 @@ def dot(a, b):
|
||||
if a.shape[1] != b.shape[0]:
|
||||
raise Exception("dot expects a.shape[1] to equal b.shape[0], but a.shape = {} and b.shape = {}.".format(a.shape, b.shape))
|
||||
shape = [a.shape[0], b.shape[1]]
|
||||
num_blocks = DistArray.compute_num_blocks(shape)
|
||||
objrefs = np.empty(num_blocks, dtype=object)
|
||||
for i in range(num_blocks[0]):
|
||||
for j in range(num_blocks[1]):
|
||||
args = list(a.objrefs[i, :]) + list(b.objrefs[:, j])
|
||||
objrefs[i, j] = blockwise_dot(*args)
|
||||
result = DistArray()
|
||||
result.construct(shape, objrefs)
|
||||
result = DistArray(shape)
|
||||
for (i, j) in np.ndindex(*result.num_blocks):
|
||||
args = list(a.objrefs[i, :]) + list(b.objrefs[:, j])
|
||||
result.objrefs[i, j] = blockwise_dot(*args)
|
||||
return result
|
||||
|
||||
# This is not in numpy, should we expose this?
|
||||
@op.distributed([DistArray], [DistArray])
|
||||
def block_column(a, col):
|
||||
if a.ndim != 2:
|
||||
raise Exception("block_column expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape))
|
||||
top_block_shape = DistArray.compute_block_shape([0, col])
|
||||
shape = [a.shape[0], top_block_shape[1]]
|
||||
result = DistArray()
|
||||
result.construct(shape, a.objrefs[:, col])
|
||||
@op.distributed([DistArray, List[int], None], [DistArray])
|
||||
def subblocks(a, *ranges):
|
||||
"""
|
||||
This function produces a distributed array from a subset of the blocks in the `a`. The result and `a` will have the same number of dimensions.For example,
|
||||
subblocks(a, [0, 1], [2, 4])
|
||||
will produce a DistArray whose objrefs are
|
||||
[[a.objrefs[0, 2], a.objrefs[0, 4]],
|
||||
[a.objrefs[1, 2], a.objrefs[1, 4]]]
|
||||
We allow the user to pass in an empty list [] to indicate the full range.
|
||||
"""
|
||||
ranges = list(ranges)
|
||||
if len(ranges) != a.ndim:
|
||||
raise Exception("sub_blocks expects to receive a number of ranges equal to a.ndim, but it received {} ranges and a.ndim = {}.".format(len(ranges), a.ndim))
|
||||
for i in range(len(ranges)):
|
||||
if ranges[i] == []: # We allow the user to pass in an empty list to indicate the full range
|
||||
ranges[i] = range(a.num_blocks[i])
|
||||
if not np.alltrue(ranges[i] == np.sort(ranges[i])):
|
||||
raise Exception("Ranges passed to sub_blocks must be sorted, but the {}th range is {}.".format(i, ranges[i]))
|
||||
if ranges[i][0] < 0:
|
||||
raise Exception("Values in the ranges passed to sub_blocks must be at least 0, but the {}th range is {}.".format(i, ranges[i]))
|
||||
if ranges[i][-1] >= a.num_blocks[i]:
|
||||
raise Exception("Values in the ranges passed to sub_blocks must be less than the relevant number of blocks, but the {}th range is {}, and a.num_blocks = {}.".format(i, ranges[i], a.num_blocks))
|
||||
last_index = [r[-1] for r in ranges]
|
||||
last_block_shape = DistArray.compute_block_shape(last_index, a.shape)
|
||||
shape = [(len(ranges[i]) - 1) * BLOCK_SIZE + last_block_shape[i] for i in range(a.ndim)]
|
||||
result = DistArray(shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
print tuple([ranges[i][index[i]] for i in range(a.ndim)])
|
||||
result.objrefs[index] = a.objrefs[tuple([ranges[i][index[i]] for i in range(a.ndim)])]
|
||||
return result
|
||||
|
||||
# This is not in numpy, should we expose this?
|
||||
@op.distributed([DistArray], [DistArray])
|
||||
def block_row(a, row):
|
||||
def transpose(a):
|
||||
if a.ndim != 2:
|
||||
raise Exception("block_row expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape))
|
||||
left_block_shape = DistArray.compute_block_shape([row, 0])
|
||||
shape = [left_block_shape[0], a.shape[1]]
|
||||
result = DistArray()
|
||||
result.construct(shape, a.objrefs[row, :])
|
||||
raise Exception("transpose expects its argument to be 2-dimensional, but a.ndim = {}, a.shape = {}.".format(a.ndim, a.shape))
|
||||
result = DistArray([a.shape[1], a.shape[0]])
|
||||
for i in range(result.num_blocks[0]):
|
||||
for j in range(result.num_blocks[1]):
|
||||
result.objrefs[i, j] = single.transpose(a.objrefs[j, i])
|
||||
return result
|
||||
|
||||
# TODO(rkn): support broadcasting?
|
||||
@op.distributed([DistArray, DistArray], [DistArray])
|
||||
def add(x1, x2):
|
||||
if x1.shape != x2.shape:
|
||||
raise Exception("add expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape))
|
||||
result = DistArray(x1.shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
result.objrefs[index] = single.add(x1.objrefs[index], x2.objrefs[index])
|
||||
return result
|
||||
|
||||
# TODO(rkn): support broadcasting?
|
||||
@op.distributed([DistArray, DistArray], [DistArray])
|
||||
def subtract(x1, x2):
|
||||
if x1.shape != x2.shape:
|
||||
raise Exception("subtract expects arguments `x1` and `x2` to have the same shape, but x1.shape = {}, and x2.shape = {}.".format(x1.shape, x2.shape))
|
||||
result = DistArray(x1.shape)
|
||||
for index in np.ndindex(*result.num_blocks):
|
||||
result.objrefs[index] = single.subtract(x1.objrefs[index], x2.objrefs[index])
|
||||
return result
|
||||
|
||||
Vendored
+192
@@ -0,0 +1,192 @@
|
||||
from typing import List
|
||||
|
||||
import numpy as np
|
||||
import arrays.single as single
|
||||
import orchpy as op
|
||||
|
||||
from core import *
|
||||
|
||||
__all__ = ["tsqr", "modified_lu", "tsqr_hr", "qr"]
|
||||
|
||||
@op.distributed([DistArray], [DistArray, np.ndarray])
|
||||
def tsqr(a):
|
||||
"""
|
||||
arguments:
|
||||
a: a distributed matrix
|
||||
Suppose that
|
||||
a.shape == (M, N)
|
||||
K == min(M, N)
|
||||
return values:
|
||||
q: DistArray, if q_full = op.context.pull(DistArray, q).assemble(), then
|
||||
q_full.shape == (M, K)
|
||||
np.allclose(np.dot(q_full.T, q_full), np.eye(K)) == True
|
||||
r: np.ndarray, if r_val = op.context.pull(np.ndarray, r), then
|
||||
r_val.shape == (K, N)
|
||||
np.allclose(r, np.triu(r)) == True
|
||||
"""
|
||||
if len(a.shape) != 2:
|
||||
raise Exception("tsqr requires len(a.shape) == 2, but a.shape is {}".format(a.shape))
|
||||
if a.num_blocks[1] != 1:
|
||||
raise Exception("tsqr requires a.num_blocks[1] == 1, but a.num_blocks is {}".format(a.num_blocks))
|
||||
|
||||
num_blocks = a.num_blocks[0]
|
||||
K = int(np.ceil(np.log2(num_blocks))) + 1
|
||||
q_tree = np.empty((num_blocks, K), dtype=object)
|
||||
current_rs = []
|
||||
for i in range(num_blocks):
|
||||
block = a.objrefs[i, 0]
|
||||
q, r = single.linalg.qr(block)
|
||||
q_tree[i, 0] = q
|
||||
current_rs.append(r)
|
||||
for j in range(1, K):
|
||||
new_rs = []
|
||||
for i in range(int(np.ceil(1.0 * len(current_rs) / 2))):
|
||||
stacked_rs = single.vstack(*current_rs[(2 * i):(2 * i + 2)])
|
||||
q, r = single.linalg.qr(stacked_rs)
|
||||
q_tree[i, j] = q
|
||||
new_rs.append(r)
|
||||
current_rs = new_rs
|
||||
assert len(current_rs) == 1, "len(current_rs) = " + str(len(current_rs))
|
||||
|
||||
q_result = DistArray()
|
||||
|
||||
# handle the special case in which the whole DistArray "a" fits in one block
|
||||
# and has fewer rows than columns, this is a bit ugly so think about how to
|
||||
# remove it
|
||||
if a.shape[0] >= a.shape[1]:
|
||||
q_shape = a.shape
|
||||
else:
|
||||
q_shape = [a.shape[0], a.shape[0]]
|
||||
q_num_blocks = DistArray.compute_num_blocks(q_shape)
|
||||
q_result = DistArray()
|
||||
q_objrefs = np.empty(q_num_blocks, dtype=object)
|
||||
q_result.construct(q_shape, q_objrefs)
|
||||
|
||||
# reconstruct output
|
||||
for i in range(num_blocks):
|
||||
q_block_current = q_tree[i, 0]
|
||||
ith_index = i
|
||||
for j in range(1, K):
|
||||
if np.mod(ith_index, 2) == 0:
|
||||
lower = [0, 0]
|
||||
upper = [a.shape[1], BLOCK_SIZE]
|
||||
else:
|
||||
lower = [a.shape[1], 0]
|
||||
upper = [2 * a.shape[1], BLOCK_SIZE]
|
||||
ith_index /= 2
|
||||
q_block_current = single.dot(q_block_current, single.subarray(q_tree[ith_index, j], lower, upper))
|
||||
q_result.objrefs[i] = q_block_current
|
||||
r = current_rs[0]
|
||||
return q_result, r
|
||||
|
||||
# TODO(rkn): This is unoptimized, we really want a block version of this.
|
||||
@op.distributed([DistArray], [DistArray, np.ndarray, np.ndarray])
|
||||
def modified_lu(q):
|
||||
"""
|
||||
Algorithm 5 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf
|
||||
takes a matrix q with orthonormal columns, returns l, u, s such that q - s = l * u
|
||||
arguments:
|
||||
q: a two dimensional orthonormal q
|
||||
return values:
|
||||
l: lower triangular
|
||||
u: upper triangular
|
||||
s: a diagonal matrix represented by its diagonal
|
||||
"""
|
||||
q = q.assemble()
|
||||
m, b = q.shape[0], q.shape[1]
|
||||
S = np.zeros(b)
|
||||
|
||||
q_work = np.copy(q)
|
||||
|
||||
for i in range(b):
|
||||
S[i] = -1 * np.sign(q_work[i, i])
|
||||
q_work[i, i] -= S[i]
|
||||
q_work[(i + 1):m, i] /= q_work[i, i] # scale ith column of L by diagonal element
|
||||
q_work[(i + 1):m, (i + 1):b] -= np.outer(q_work[(i + 1):m, i], q_work[i, (i + 1):b]) # perform Schur complement update
|
||||
|
||||
L = np.tril(q_work)
|
||||
for i in range(b):
|
||||
L[i, i] = 1
|
||||
U = np.triu(q_work)[:b, :]
|
||||
return numpy_to_dist(op.push(L)), U, S # TODO(rkn): get rid of push and pull
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray, np.ndarray, int], [np.ndarray, np.ndarray])
|
||||
def tsqr_hr_helper1(u, s, y_top_block, b):
|
||||
y_top = y_top_block[:b, :b]
|
||||
s_full = np.diag(s)
|
||||
t = -1 * np.dot(u, np.dot(s_full, np.linalg.inv(y_top).T))
|
||||
return t, y_top
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def tsqr_hr_helper2(s, r_temp):
|
||||
s_full = np.diag(s)
|
||||
return np.dot(s_full, r_temp)
|
||||
|
||||
@op.distributed([DistArray], [DistArray, np.ndarray, np.ndarray, np.ndarray])
|
||||
def tsqr_hr(a):
|
||||
"""Algorithm 6 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf"""
|
||||
q, r_temp = tsqr(a)
|
||||
y, u, s = modified_lu(q)
|
||||
y_blocked = op.pull(y)
|
||||
t, y_top = tsqr_hr_helper1(u, s, y_blocked.objrefs[0, 0], a.shape[1])
|
||||
r = tsqr_hr_helper2(s, r_temp)
|
||||
return y, t, y_top, r
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray, np.ndarray, np.ndarray], [np.ndarray])
|
||||
def qr_helper1(a_rc, y_ri, t, W_c):
|
||||
return a_rc - np.dot(y_ri, np.dot(t.T, W_c))
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def qr_helper2(y_ri, a_rc):
|
||||
return np.dot(y_ri.T, a_rc)
|
||||
|
||||
@op.distributed([DistArray], [DistArray, DistArray])
|
||||
def qr(a):
|
||||
"""Algorithm 7 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf"""
|
||||
m, n = a.shape[0], a.shape[1]
|
||||
k = min(m, n)
|
||||
|
||||
# we will store our scratch work in a_work
|
||||
a_work = DistArray()
|
||||
a_work.construct(a.shape, np.copy(a.objrefs))
|
||||
|
||||
result_dtype = np.linalg.qr(op.pull(a.objrefs[0, 0]))[0].dtype.name
|
||||
r_res = op.pull(zeros([k, n], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it.
|
||||
y_res = op.pull(zeros([m, k], result_dtype)) # TODO(rkn): It would be preferable not to pull this right after creating it.
|
||||
Ts = []
|
||||
|
||||
for i in range(min(a.num_blocks[0], a.num_blocks[1])): # this differs from the paper, which says "for i in range(a.num_blocks[1])", but that doesn't seem to make any sense when a.num_blocks[1] > a.num_blocks[0]
|
||||
sub_dist_array = subblocks(a_work, range(i, a_work.num_blocks[0]), [i])
|
||||
y, t, _, R = tsqr_hr(sub_dist_array)
|
||||
y_val = op.pull(y)
|
||||
|
||||
for j in range(i, a.num_blocks[0]):
|
||||
y_res.objrefs[j, i] = y_val.objrefs[j - i, 0]
|
||||
if a.shape[0] > a.shape[1]:
|
||||
# in this case, R needs to be square
|
||||
R_shape = op.pull(single.shape(R))
|
||||
eye_temp = single.eye2(R_shape[1], R_shape[0], result_dtype)
|
||||
r_res.objrefs[i, i] = single.dot(eye_temp, R)
|
||||
else:
|
||||
r_res.objrefs[i, i] = R
|
||||
Ts.append(numpy_to_dist(t))
|
||||
|
||||
for c in range(i + 1, a.num_blocks[1]):
|
||||
W_rcs = []
|
||||
for r in range(i, a.num_blocks[0]):
|
||||
y_ri = y_val.objrefs[r - i, 0]
|
||||
W_rcs.append(qr_helper2(y_ri, a_work.objrefs[r, c]))
|
||||
W_c = single.sum(0, *W_rcs)
|
||||
for r in range(i, a.num_blocks[0]):
|
||||
y_ri = y_val.objrefs[r - i, 0]
|
||||
A_rc = qr_helper1(a_work.objrefs[r, c], y_ri, t, W_c)
|
||||
a_work.objrefs[r, c] = A_rc
|
||||
r_res.objrefs[i, c] = a_work.objrefs[i, c]
|
||||
|
||||
# construct q_res from Ys and Ts
|
||||
q = eye2(m, k, result_dtype)
|
||||
for i in range(len(Ts))[::-1]:
|
||||
y_col_block = subblocks(y_res, [], [i])
|
||||
q = subtract(q, dot(y_col_block, dot(Ts[i], dot(transpose(y_col_block), q))))
|
||||
|
||||
return q, r_res
|
||||
@@ -1,2 +1,2 @@
|
||||
import random, linalg
|
||||
from core import zeros, zeros_like, ones, eye, dot, vstack, hstack, subarray, copy, tril, triu
|
||||
from core import *
|
||||
|
||||
@@ -2,6 +2,8 @@ from typing import List
|
||||
import numpy as np
|
||||
import orchpy as op
|
||||
|
||||
__all__ = ["zeros", "zeros_like", "ones", "eye", "dot", "vstack", "hstack", "subarray", "copy", "tril", "triu", "diag", "transpose", "add", "subtract", "eye2", "sum", "shape"]
|
||||
|
||||
@op.distributed([List[int], str], [np.ndarray])
|
||||
def zeros(shape, dtype_name):
|
||||
return np.zeros(shape, dtype=np.dtype(dtype_name))
|
||||
@@ -18,6 +20,11 @@ def ones(shape, dtype_name):
|
||||
def eye(dim, dtype_name):
|
||||
return np.eye(dim, dtype=np.dtype(dtype_name))
|
||||
|
||||
# TODO(rkn): This should be part of eye
|
||||
@op.distributed([int, int, str], [np.ndarray])
|
||||
def eye2(dim1, dim2, dtype_name):
|
||||
return np.eye(dim1, dim2, dtype=np.dtype(dtype_name))
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def dot(a, b):
|
||||
return np.dot(a, b)
|
||||
@@ -49,3 +56,27 @@ def tril(a):
|
||||
@op.distributed([np.ndarray], [np.ndarray])
|
||||
def triu(a):
|
||||
return np.triu(a)
|
||||
|
||||
@op.distributed([np.ndarray], [np.ndarray])
|
||||
def diag(a):
|
||||
return np.diag(a)
|
||||
|
||||
@op.distributed([np.ndarray], [np.ndarray])
|
||||
def transpose(a):
|
||||
return np.transpose(a)
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def add(x1, x2):
|
||||
return np.add(x1, x2)
|
||||
|
||||
@op.distributed([np.ndarray, np.ndarray], [np.ndarray])
|
||||
def subtract(x1, x2):
|
||||
return np.subtract(x1, x2)
|
||||
|
||||
@op.distributed([int, np.ndarray, None], [np.ndarray])
|
||||
def sum(axis, *xs):
|
||||
return np.sum(xs, axis=axis)
|
||||
|
||||
@op.distributed([np.ndarray], [tuple])
|
||||
def shape(a):
|
||||
return np.shape(a)
|
||||
|
||||
@@ -86,39 +86,3 @@ def matrix_rank(M):
|
||||
@op.distributed([np.ndarray, None], [np.ndarray])
|
||||
def multi_dot(a):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
||||
# This isn't in numpy, should we expose it?
|
||||
@op.distributed([np.ndarray], [np.ndarray, np.ndarray, np.ndarray])
|
||||
def modified_lu(q):
|
||||
"""
|
||||
Algorithm 5 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-175.pdf
|
||||
takes a matrix q with orthonormal columns, returns l, u, s such that q - s = l * u
|
||||
arguments:
|
||||
q: a two dimensional orthonormal q
|
||||
return values:
|
||||
l: lower triangular
|
||||
u: upper triangular
|
||||
s: a diagonal matrix represented by its diagonal
|
||||
"""
|
||||
m, b = q.shape[0], q.shape[1]
|
||||
S = np.zeros(b)
|
||||
|
||||
q_work = np.copy(q)
|
||||
|
||||
for i in range(b):
|
||||
S[i] = -1 * np.sign(q_work[i, i])
|
||||
q_work[i, i] -= S[i]
|
||||
|
||||
# scale ith column of L by diagonal element
|
||||
q_work[(i + 1):m, i] /= q_work[i, i]
|
||||
|
||||
# perform Schur complement update
|
||||
q_work[(i + 1):m, (i + 1):b] -= np.outer(q_work[(i + 1):m, i], q_work[i, (i + 1):b])
|
||||
|
||||
L = np.tril(q_work)
|
||||
for i in range(b):
|
||||
L[i, i] = 1
|
||||
U = np.triu(q_work)[:b, :]
|
||||
return L, U, S
|
||||
|
||||
@@ -99,10 +99,10 @@ def distributed(arg_types, return_types, worker=global_worker):
|
||||
def distributed_decorator(func):
|
||||
def func_executor(arguments):
|
||||
"""This is what gets executed remotely on a worker after a distributed function is scheduled by the scheduler."""
|
||||
print "Calling function {} with arguments {}".format(func.__name__, arguments)
|
||||
print "Calling function {}".format(func.__name__)
|
||||
result = func(*arguments)
|
||||
check_return_values(func_call, result) # throws an exception if result is invalid
|
||||
print "Finished executing function {} with arguments {}".format(func.__name__, arguments)
|
||||
print "Finished executing function {}".format(func.__name__)
|
||||
return result
|
||||
def func_call(*args):
|
||||
"""This is what gets run immediately when a worker calls a distributed function."""
|
||||
|
||||
+122
-1
@@ -82,7 +82,7 @@ class ArraysDistTest(unittest.TestCase):
|
||||
def testMethods(self):
|
||||
test_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
test_path = os.path.join(test_dir, "testrecv.py")
|
||||
services.start_cluster(num_workers=4, worker_path=test_path)
|
||||
services.start_cluster(num_workers=8, worker_path=test_path)
|
||||
|
||||
x = dist.zeros([9, 25, 51], "float")
|
||||
y = dist.assemble(x)
|
||||
@@ -123,6 +123,127 @@ class ArraysDistTest(unittest.TestCase):
|
||||
np.allclose(orchpy.pull(w), np.dot(orchpy.pull(u), orchpy.pull(v)))
|
||||
self.assertTrue(np.allclose(orchpy.pull(w), np.dot(orchpy.pull(u), orchpy.pull(v))))
|
||||
|
||||
# test add
|
||||
x = dist.random.normal([23, 42])
|
||||
y = dist.random.normal([23, 42])
|
||||
z = dist.add(x, y)
|
||||
z_full = dist.assemble(z)
|
||||
x_full = dist.assemble(x)
|
||||
y_full = dist.assemble(y)
|
||||
self.assertTrue(np.allclose(orchpy.pull(z_full), orchpy.pull(x_full) + orchpy.pull(y_full)))
|
||||
|
||||
# test subtract
|
||||
x = dist.random.normal([33, 40])
|
||||
y = dist.random.normal([33, 40])
|
||||
z = dist.subtract(x, y)
|
||||
z_full = dist.assemble(z)
|
||||
x_full = dist.assemble(x)
|
||||
y_full = dist.assemble(y)
|
||||
self.assertTrue(np.allclose(orchpy.pull(z_full), orchpy.pull(x_full) - orchpy.pull(y_full)))
|
||||
|
||||
# test transpose
|
||||
x = dist.random.normal([234, 432])
|
||||
y = dist.transpose(x)
|
||||
x_full = dist.assemble(x)
|
||||
y_full = dist.assemble(y)
|
||||
self.assertTrue(np.alltrue(orchpy.pull(x_full).T == orchpy.pull(y_full)))
|
||||
|
||||
# test numpy_to_dist
|
||||
x = dist.random.normal([23, 45])
|
||||
y = dist.assemble(x)
|
||||
z = dist.numpy_to_dist(y)
|
||||
w = dist.assemble(z)
|
||||
x_full = dist.assemble(x)
|
||||
z_full = dist.assemble(z)
|
||||
self.assertTrue(np.alltrue(orchpy.pull(x_full) == orchpy.pull(z_full)))
|
||||
self.assertTrue(np.alltrue(orchpy.pull(y) == orchpy.pull(w)))
|
||||
|
||||
# test dist.tsqr
|
||||
for shape in [[123, dist.BLOCK_SIZE], [7, dist.BLOCK_SIZE], [dist.BLOCK_SIZE, dist.BLOCK_SIZE], [dist.BLOCK_SIZE, 7], [10 * dist.BLOCK_SIZE, dist.BLOCK_SIZE]]:
|
||||
x = dist.random.normal(shape)
|
||||
K = min(shape)
|
||||
q, r = dist.linalg.tsqr(x)
|
||||
x_full = dist.assemble(x)
|
||||
x_val = orchpy.pull(x_full)
|
||||
q_full = dist.assemble(q)
|
||||
q_val = orchpy.pull(q_full)
|
||||
r_val = orchpy.pull(r)
|
||||
self.assertTrue(r_val.shape == (K, shape[1]))
|
||||
self.assertTrue(np.alltrue(r_val == np.triu(r_val)))
|
||||
self.assertTrue(np.allclose(x_val, np.dot(q_val, r_val)))
|
||||
self.assertTrue(np.allclose(np.dot(q_val.T, q_val), np.eye(K)))
|
||||
|
||||
# test dist.linalg.modified_lu
|
||||
def test_modified_lu(d1, d2):
|
||||
print "testing dist_modified_lu with d1 = " + str(d1) + ", d2 = " + str(d2)
|
||||
assert d1 >= d2
|
||||
k = min(d1, d2)
|
||||
m = single.random.normal([d1, d2])
|
||||
q, r = single.linalg.qr(m)
|
||||
l, u, s = dist.linalg.modified_lu(dist.numpy_to_dist(q))
|
||||
q_val = orchpy.pull(q)
|
||||
r_val = orchpy.pull(r)
|
||||
l_full = dist.assemble(l)
|
||||
l_val = orchpy.pull(l_full)
|
||||
u_val = orchpy.pull(u)
|
||||
s_val = orchpy.pull(s)
|
||||
s_mat = np.zeros((d1, d2))
|
||||
for i in range(len(s_val)):
|
||||
s_mat[i, i] = s_val[i]
|
||||
self.assertTrue(np.allclose(q_val - s_mat, np.dot(l_val, u_val))) # check that q - s = l * u
|
||||
self.assertTrue(np.alltrue(np.triu(u_val) == u_val)) # check that u is upper triangular
|
||||
self.assertTrue(np.alltrue(np.tril(l_val) == l_val)) # check that l is lower triangular
|
||||
|
||||
for d1, d2 in [(100, 100), (99, 98), (7, 5), (7, 7), (20, 7), (20, 10)]:
|
||||
test_modified_lu(d1, d2)
|
||||
|
||||
# test dist_tsqr_hr
|
||||
def test_dist_tsqr_hr(d1, d2):
|
||||
print "testing dist_tsqr_hr with d1 = " + str(d1) + ", d2 = " + str(d2)
|
||||
a = dist.random.normal([d1, d2])
|
||||
y, t, y_top, r = dist.linalg.tsqr_hr(a)
|
||||
a_full = dist.assemble(a)
|
||||
a_val = orchpy.pull(a_full)
|
||||
y_full = dist.assemble(y)
|
||||
y_val = orchpy.pull(y_full)
|
||||
t_val = orchpy.pull(t)
|
||||
y_top_val = orchpy.pull(y_top)
|
||||
r_val = orchpy.pull(r)
|
||||
tall_eye = np.zeros((d1, min(d1, d2)))
|
||||
np.fill_diagonal(tall_eye, 1)
|
||||
q = tall_eye - np.dot(y_val, np.dot(t_val, y_top_val.T))
|
||||
self.assertTrue(np.allclose(np.dot(q.T, q), np.eye(min(d1, d2)))) # check that q.T * q = I
|
||||
self.assertTrue(np.allclose(np.dot(q, r_val), a_val)) # check that a = (I - y * t * y_top.T) * r
|
||||
|
||||
for d1, d2 in [(123, dist.BLOCK_SIZE), (7, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, 7), (10 * dist.BLOCK_SIZE, dist.BLOCK_SIZE)]:
|
||||
test_dist_tsqr_hr(d1, d2)
|
||||
|
||||
def test_dist_qr(d1, d2):
|
||||
print "testing qr with d1 = {}, and d2 = {}.".format(d1, d2)
|
||||
a = dist.random.normal([d1, d2])
|
||||
K = min(d1, d2)
|
||||
q, r = dist.linalg.qr(a)
|
||||
a_full = dist.assemble(a)
|
||||
q_full = dist.assemble(q)
|
||||
r_full = dist.assemble(r)
|
||||
a_val = orchpy.pull(a_full)
|
||||
q_val = orchpy.pull(q_full)
|
||||
r_val = orchpy.pull(r_full)
|
||||
|
||||
self.assertTrue(q_val.shape == (d1, K))
|
||||
self.assertTrue(r_val.shape == (K, d2))
|
||||
self.assertTrue(np.allclose(np.dot(q_val.T, q_val), np.eye(K)))
|
||||
self.assertTrue(np.alltrue(r_val == np.triu(r_val)))
|
||||
self.assertTrue(np.allclose(a_val, np.dot(q_val, r_val)))
|
||||
|
||||
for d1, d2 in [(123, dist.BLOCK_SIZE), (7, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, dist.BLOCK_SIZE), (dist.BLOCK_SIZE, 7), (13, 21), (34, 35), (8, 7)]:
|
||||
test_dist_qr(d1, d2)
|
||||
test_dist_qr(d2, d1)
|
||||
for _ in range(20):
|
||||
d1 = np.random.randint(1, 35)
|
||||
d2 = np.random.randint(1, 35)
|
||||
test_dist_qr(d1, d2)
|
||||
|
||||
services.cleanup()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
+1
-1
@@ -25,7 +25,7 @@ if __name__ == '__main__':
|
||||
orchpy.register_module(single.linalg)
|
||||
orchpy.register_module(dist)
|
||||
orchpy.register_module(dist.random)
|
||||
# orchpy.register_module(dist.linalg)
|
||||
orchpy.register_module(dist.linalg)
|
||||
orchpy.register_module(sys.modules[__name__])
|
||||
|
||||
worker.main_loop()
|
||||
|
||||
Reference in New Issue
Block a user