[DataFrame] Pass lengths to _default_index instead of df (#1621)

* Pass lengths to remote function over DataFrame

* Increasing performance by moving length to remote
This commit is contained in:
Kunal Gosar
2018-02-27 02:38:26 -08:00
committed by Devin Petersohn
parent 4ab16d7fb3
commit 34664dbf76
+20 -13
View File
@@ -35,12 +35,11 @@ class DataFrame(object):
assert(len(df) > 0)
self._df = df
self._compute_lengths()
self.columns = columns
# this _index object is a pd.DataFrame
# and we use that DataFrame's Index to index the rows.
self._index = _default_index.remote(self)
self._lengths, self._index = _compute_length_and_index.remote(self._df)
if index is not None:
self.index = index
@@ -101,7 +100,11 @@ class DataFrame(object):
Returns:
A list of integers representing the length of each partition.
"""
if isinstance(self._length_cache[0], ray.local_scheduler.ObjectID):
if isinstance(self._length_cache, ray.local_scheduler.ObjectID):
self._length_cache = ray.get(self._length_cache)
elif isinstance(self._length_cache, list) and \
isinstance(self._length_cache[0],
ray.local_scheduler.ObjectID):
self._length_cache = ray.get(self._length_cache)
return self._length_cache
@@ -227,8 +230,7 @@ class DataFrame(object):
if index:
self.index = index
self._compute_lengths()
self._index = self._default_index()
self._lengths, self._index = _compute_length_and_index.remote(self._df)
def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
@@ -1320,7 +1322,8 @@ class DataFrame(object):
values, mask, np.nan)
return values
new_index = ray.get(_default_index.remote(new_obj)).index
_, new_index = _compute_length_and_index.remote(new_obj._df)
new_index = ray.get(new_index).index
if level is not None:
if not isinstance(level, (tuple, list)):
level = [level]
@@ -2128,17 +2131,21 @@ def to_pandas(df):
return pd_df
@ray.remote
def _default_index(df):
@ray.remote(num_return_vals=2)
def _compute_length_and_index(dfs):
"""Create a default index, which is a RangeIndex
Returns:
The pd.RangeIndex object that represents this DataFrame.
"""
lengths = ray.get([_deploy_func.remote(_get_lengths, d)
for d in dfs])
dest_indices = {"partition":
[i for i in range(len(df._lengths))
for j in range(df._lengths[i])],
[i for i in range(len(lengths))
for j in range(lengths[i])],
"index_within_partition":
[j for i in range(len(df._lengths))
for j in range(df._lengths[i])]}
return pd.DataFrame(dest_indices)
[j for i in range(len(lengths))
for j in range(lengths[i])]}
return lengths, pd.DataFrame(dest_indices)