moved _default_index to remote fn (#1617)

This commit is contained in:
Kunal Gosar
2018-02-26 21:12:04 -08:00
committed by Devin Petersohn
parent 48bd7b147d
commit f43328f332
+35 -13
View File
@@ -40,7 +40,7 @@ class DataFrame(object):
# this _index object is a pd.DataFrame
# and we use that DataFrame's Index to index the rows.
self._index = self._default_index()
self._index = _default_index.remote(self)
if index is not None:
self.index = index
@@ -67,21 +67,27 @@ class DataFrame(object):
"""
self._index.index = new_index
def _default_index(self):
"""Create a default index, which is a RangeIndex
index = property(_get_index, _set_index)
def _get__index(self):
"""Get the _index for this DataFrame.
Returns:
The pd.RangeIndex object that represents this DataFrame.
The default index.
"""
dest_indices = {"partition":
[i for i in range(len(self._lengths))
for j in range(self._lengths[i])],
"index_within_partition":
[j for i in range(len(self._lengths))
for j in range(self._lengths[i])]}
return pd.DataFrame(dest_indices)
if isinstance(self._index_cache, ray.local_scheduler.ObjectID):
self._index_cache = ray.get(self._index_cache)
return self._index_cache
index = property(_get_index, _set_index)
def _set__index(self, new__index):
"""Set the _index for this DataFrame.
Args:
new__index: The new default index to set.
"""
self._index_cache = new__index
_index = property(_get__index, _set__index)
def _compute_lengths(self):
"""Updates the stored lengths of DataFrame partions
@@ -1315,7 +1321,7 @@ class DataFrame(object):
values, mask, np.nan)
return values
new_index = new_obj._default_index().index
new_index = ray.get(_default_index.remote(new_obj)).index
if level is not None:
if not isinstance(level, (tuple, list)):
level = [level]
@@ -2107,3 +2113,19 @@ def to_pandas(df):
pd_df.index = df.index
pd_df.columns = df.columns
return pd_df
@ray.remote
def _default_index(df):
"""Create a default index, which is a RangeIndex
Returns:
The pd.RangeIndex object that represents this DataFrame.
"""
dest_indices = {"partition":
[i for i in range(len(df._lengths))
for j in range(df._lengths[i])],
"index_within_partition":
[j for i in range(len(df._lengths))
for j in range(df._lengths[i])]}
return pd.DataFrame(dest_indices)