From 34664dbf76d07de1a1458fef813a662ababc9046 Mon Sep 17 00:00:00 2001 From: Kunal Gosar Date: Tue, 27 Feb 2018 02:38:26 -0800 Subject: [PATCH] [DataFrame] Pass lengths to _default_index instead of df (#1621) * Pass lengths to remote function over DataFrame * Increasing performance by moving length to remote --- python/ray/dataframe/dataframe.py | 33 +++++++++++++++++++------------ 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index d3fb4c98f..32ea12d11 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -35,12 +35,11 @@ class DataFrame(object): assert(len(df) > 0) self._df = df - self._compute_lengths() self.columns = columns # this _index object is a pd.DataFrame # and we use that DataFrame's Index to index the rows. - self._index = _default_index.remote(self) + self._lengths, self._index = _compute_length_and_index.remote(self._df) if index is not None: self.index = index @@ -101,7 +100,11 @@ class DataFrame(object): Returns: A list of integers representing the length of each partition. """ - if isinstance(self._length_cache[0], ray.local_scheduler.ObjectID): + if isinstance(self._length_cache, ray.local_scheduler.ObjectID): + self._length_cache = ray.get(self._length_cache) + elif isinstance(self._length_cache, list) and \ + isinstance(self._length_cache[0], + ray.local_scheduler.ObjectID): self._length_cache = ray.get(self._length_cache) return self._length_cache @@ -227,8 +230,7 @@ class DataFrame(object): if index: self.index = index - self._compute_lengths() - self._index = self._default_index() + self._lengths, self._index = _compute_length_and_index.remote(self._df) def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -1320,7 +1322,8 @@ class DataFrame(object): values, mask, np.nan) return values - new_index = ray.get(_default_index.remote(new_obj)).index + _, new_index = _compute_length_and_index.remote(new_obj._df) + new_index = ray.get(new_index).index if level is not None: if not isinstance(level, (tuple, list)): level = [level] @@ -2128,17 +2131,21 @@ def to_pandas(df): return pd_df -@ray.remote -def _default_index(df): +@ray.remote(num_return_vals=2) +def _compute_length_and_index(dfs): """Create a default index, which is a RangeIndex Returns: The pd.RangeIndex object that represents this DataFrame. """ + lengths = ray.get([_deploy_func.remote(_get_lengths, d) + for d in dfs]) + dest_indices = {"partition": - [i for i in range(len(df._lengths)) - for j in range(df._lengths[i])], + [i for i in range(len(lengths)) + for j in range(lengths[i])], "index_within_partition": - [j for i in range(len(df._lengths)) - for j in range(df._lengths[i])]} - return pd.DataFrame(dest_indices) + [j for i in range(len(lengths)) + for j in range(lengths[i])]} + + return lengths, pd.DataFrame(dest_indices)