From f43328f332c4a92d53d48bc218d7d8d0239e3dc1 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Mon, 26 Feb 2018 21:12:04 -0800 Subject: [PATCH] moved _default_index to remote fn (#1617) --- python/ray/dataframe/dataframe.py | 48 ++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 113d41510..7ecf89a84 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -40,7 +40,7 @@ class DataFrame(object): # this _index object is a pd.DataFrame # and we use that DataFrame's Index to index the rows. - self._index = self._default_index() + self._index = _default_index.remote(self) if index is not None: self.index = index @@ -67,21 +67,27 @@ class DataFrame(object): """ self._index.index = new_index - def _default_index(self): - """Create a default index, which is a RangeIndex + index = property(_get_index, _set_index) + + def _get__index(self): + """Get the _index for this DataFrame. Returns: - The pd.RangeIndex object that represents this DataFrame. + The default index. """ - dest_indices = {"partition": - [i for i in range(len(self._lengths)) - for j in range(self._lengths[i])], - "index_within_partition": - [j for i in range(len(self._lengths)) - for j in range(self._lengths[i])]} - return pd.DataFrame(dest_indices) + if isinstance(self._index_cache, ray.local_scheduler.ObjectID): + self._index_cache = ray.get(self._index_cache) + return self._index_cache - index = property(_get_index, _set_index) + def _set__index(self, new__index): + """Set the _index for this DataFrame. + + Args: + new__index: The new default index to set. + """ + self._index_cache = new__index + + _index = property(_get__index, _set__index) def _compute_lengths(self): """Updates the stored lengths of DataFrame partions @@ -1315,7 +1321,7 @@ class DataFrame(object): values, mask, np.nan) return values - new_index = new_obj._default_index().index + new_index = ray.get(_default_index.remote(new_obj)).index if level is not None: if not isinstance(level, (tuple, list)): level = [level] @@ -2107,3 +2113,19 @@ def to_pandas(df): pd_df.index = df.index pd_df.columns = df.columns return pd_df + + +@ray.remote +def _default_index(df): + """Create a default index, which is a RangeIndex + + Returns: + The pd.RangeIndex object that represents this DataFrame. + """ + dest_indices = {"partition": + [i for i in range(len(df._lengths)) + for j in range(df._lengths[i])], + "index_within_partition": + [j for i in range(len(df._lengths)) + for j in range(df._lengths[i])]} + return pd.DataFrame(dest_indices)