From de6fa02c85090ccf68c79bb35d48026b4df31175 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 21 Feb 2018 08:46:37 -0800 Subject: [PATCH] [DataFrame] Fix transpose with nan values and add functionality needed for Index (#1545) --- python/ray/dataframe/dataframe.py | 105 +++++++++++++++----- python/ray/dataframe/index.py | 51 ++++++++-- python/ray/dataframe/test/test_dataframe.py | 56 ++++++++++- 3 files changed, 174 insertions(+), 38 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index f1922dbd4..132c53047 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -7,10 +7,12 @@ import numpy as np import ray import itertools +from .index import Index + class DataFrame(object): - def __init__(self, df, columns): + def __init__(self, df, columns, index=None): """Distributed DataFrame object backed by Pandas dataframes. Args: @@ -22,29 +24,53 @@ class DataFrame(object): assert(len(df) > 0) self._df = df + # TODO: Clean up later. + # We will call get only when we access the object (and only once). + self._lengths = \ + ray.get([_deploy_func.remote(_get_lengths, d) for d in self._df]) self.columns = columns + if index is None: + self._index = self._default_index() + else: + self._index = index + + self._pd_index = None + def __str__(self): return "ray.DataFrame object" def __repr__(self): return "ray.DataFrame object" - @property - def index(self): + def _get_index(self): """Get the index for this DataFrame. Returns: The union of all indexes across the partitions. """ - indices = ray.get(self._map_partitions(lambda df: df.index)._df) - if isinstance(indices[0], pd.RangeIndex): - merged = indices[0] - for index in indices[1:]: - merged = merged.union(index) - return merged - else: - return indices[0].append(indices[1:]) + if self._pd_index is None: + self._pd_index = Index.to_pandas(self._index) + + return self._pd_index + + def _set_index(self, new_index): + """Set the index for this DataFrame. + + Args: + new_index: The new index to set this + """ + self._pd_index = None + self._index = Index.from_pandas(new_index, self._lengths) + + def _default_index(self): + dest_indices = [(i, j) + for i in range(len(self._lengths)) + for j in range(self._lengths[i])] + return Index({i: dest_indices[i] for i in range(len(dest_indices))}, + pd.RangeIndex) + + index = property(_get_index, _set_index) @property def size(self): @@ -140,7 +166,7 @@ class DataFrame(object): assert(callable(func)) new_df = [_deploy_func.remote(func, part) for part in self._df] - return DataFrame(new_df, self.columns) + return DataFrame(new_df, self.columns, index=self._index) def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -150,7 +176,7 @@ class DataFrame(object): """ new_dfs = self._map_partitions(lambda df: df.add_prefix(prefix)) new_cols = self.columns.map(lambda x: str(prefix) + str(x)) - return DataFrame(new_dfs._df, new_cols) + return DataFrame(new_dfs._df, new_cols, index=self._index) def add_suffix(self, suffix): """Add a suffix to each of the column names. @@ -160,7 +186,7 @@ class DataFrame(object): """ new_dfs = self._map_partitions(lambda df: df.add_suffix(suffix)) new_cols = self.columns.map(lambda x: str(x) + str(suffix)) - return DataFrame(new_dfs._df, new_cols) + return DataFrame(new_dfs._df, new_cols, index=self._index) def applymap(self, func): """Apply a function to a DataFrame elementwise. @@ -177,7 +203,7 @@ class DataFrame(object): Returns: A new DataFrame pointing to the same partitions as this one. """ - return DataFrame(self._df, self.columns) + return DataFrame(self._df, self.columns, index=self._index) def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, group_keys=True, squeeze=False, **kwargs): @@ -199,11 +225,8 @@ class DataFrame(object): [index for df in ray.get(self._df) for index in list(df.index)])) chunksize = int(len(indices) / len(self._df)) - partitions = [] - - for df in self._df: - partitions.append(_shuffle.remote(df, indices, chunksize)) - + partitions = [_shuffle.remote(df, indices, chunksize) + for df in self._df] partitions = ray.get(partitions) # Transpose the list of dataframes @@ -213,7 +236,6 @@ class DataFrame(object): shuffle.append([]) for j in range(len(partitions)): shuffle[i].append(partitions[j][i]) - new_dfs = [_local_groupby.remote(part, axis=axis) for part in shuffle] return DataFrame(new_dfs, self.columns) @@ -311,8 +333,10 @@ class DataFrame(object): """ local_transpose = self._map_partitions( lambda df: df.transpose(*args, **kwargs)) + # Sum will collapse the NAs from the groupby - return local_transpose.reduce_by_index(lambda df: df.sum(), axis=1) + return local_transpose.reduce_by_index( + lambda df: df.apply(lambda x: x), axis=1) T = property(transpose) @@ -1502,6 +1526,24 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") +def _get_lengths(df): + """Gets the length of the dataframe. + + Args: + df: A remote pd.DataFrame object. + + Returns: + Returns an integer length of the dataframe object. If the attempt + fails, returns 0 as the length. + """ + try: + return len(df) + # Because we sometimes have cases where we have summary statistics in our + # DataFrames + except TypeError: + return 0 + + @ray.remote def _shuffle(df, indices, chunksize): """Shuffle data by sending it through the Ray Store. @@ -1518,12 +1560,12 @@ def _shuffle(df, indices, chunksize): i = 0 partition = [] while len(indices) > chunksize: - oids = df.reindex(indices[:chunksize]).dropna() + oids = df.reindex(indices[:chunksize]) partition.append(oids) indices = indices[chunksize:] i += 1 else: - oids = df.reindex(indices).dropna() + oids = df.reindex(indices) partition.append(oids) return partition @@ -1581,16 +1623,27 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True): elif chunksize is None: raise ValueError("The number of partitions or chunksize must be set.") + old_index = df.index + # TODO stop reassigning df dataframes = [] + lengths = [] while len(df) > chunksize: - top = ray.put(df[:chunksize]) + t_df = df[:chunksize] + lengths.append(len(t_df)) + # reindex here because we want a pd.RangeIndex within the partitions. + # It is smaller and sometimes faster. + t_df.reindex() + top = ray.put(t_df) dataframes.append(top) df = df[chunksize:] else: dataframes.append(ray.put(df)) + lengths.append(len(df)) - return DataFrame(dataframes, df.columns) + ray_index = Index.from_pandas(old_index, lengths) + + return DataFrame(dataframes, df.columns, index=ray_index) def to_pandas(df): diff --git a/python/ray/dataframe/index.py b/python/ray/dataframe/index.py index 790c53f7e..e42426a6c 100644 --- a/python/ray/dataframe/index.py +++ b/python/ray/dataframe/index.py @@ -7,15 +7,50 @@ import pandas as pd class Index(object): - def __init__(self, idx): + def __init__(self, idx, pandas_type): self.idx = idx + self.pandas_type = pandas_type + + def __getitem__(self, item): + return self.idx[item] + + def __len__(self): + return len(self.idx) @classmethod - def to_pandas(indices): - if isinstance(indices[0], pd.RangeIndex): - merged = indices[0] - for index in indices[1:]: - merged = merged.union(index) - return merged + def to_pandas(cls, index): + """Convert a Ray Index object to a Pandas Index object. + + Args: + index (ray.Index): A Ray Index object. + + Returns: + A pandas Index object. + """ + k = index.idx.keys() + if index.pandas_type is pd.RangeIndex: + return pd.RangeIndex(min(k), max(k) + 1) else: - return indices[0].append(indices[1:]) + return pd.Index(k) + + @classmethod + def from_pandas(cls, pd_index, lengths): + """Convert a Pandas Index object to a Ray Index object. + + Args: + pd_index (pd.Index): A Pandas Index object. + lengths ([int]): A list of lengths for the partitions. + + Returns: + A Ray Index object. + """ + dest_indices = [(i, j) + for i in range(len(lengths)) + for j in range(lengths[i])] + if len(pd_index) != len(dest_indices): + raise ValueError( + "Length of index given does not match current dataframe") + + return Index( + {pd_index[i]: dest_indices[i] for i in range(len(dest_indices))}, + type(pd_index)) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 19462a702..c3c9af0d4 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -22,6 +22,12 @@ def test_roundtrip(ray_df, pandas_df): @pytest.fixture def test_index(ray_df, pandas_df): assert(ray_df.index.equals(pandas_df.index)) + ray_df_cp = ray_df.copy() + pandas_df_cp = pandas_df.copy() + + ray_df_cp.index = [str(i) for i in ray_df_cp.index] + pandas_df_cp.index = [str(i) for i in pandas_df_cp.index] + assert(ray_df_cp.index.sort_values().equals(pandas_df_cp.index)) @pytest.fixture @@ -41,10 +47,7 @@ def test_ftypes(ray_df, pandas_df): @pytest.fixture def test_values(ray_df, pandas_df): - a = np.ndarray.flatten(ray_df.values) - b = np.ndarray.flatten(pandas_df.values) - for c, d in zip(a, b): - assert(c == d or (np.isnan(c) and np.isnan(d))) + np.testing.assert_equal(ray_df.values, pandas_df.values) @pytest.fixture @@ -339,6 +342,51 @@ def test_mixed_dtype_dataframe(): test_notnull(ray_df, pandas_df) +def test_nan_dataframe(): + pandas_df = pd.DataFrame({ + 'col1': [1, 2, 3, np.nan], + 'col2': [4, 5, np.nan, 7], + 'col3': [8, np.nan, 10, 11], + 'col4': [np.nan, 13, 14, 15]}) + + ray_df = rdf.from_pandas(pandas_df, 2) + + testfuncs = [lambda x: x + x, + lambda x: str(x), + lambda x: x, + lambda x: False] + + keys = ['col1', + 'col2', + 'col3', + 'col4'] + + test_roundtrip(ray_df, pandas_df) + test_index(ray_df, pandas_df) + test_size(ray_df, pandas_df) + test_ndim(ray_df, pandas_df) + test_ftypes(ray_df, pandas_df) + test_values(ray_df, pandas_df) + test_axes(ray_df, pandas_df) + test_shape(ray_df, pandas_df) + test_add_prefix(ray_df, pandas_df) + test_add_suffix(ray_df, pandas_df) + + for testfunc in testfuncs: + test_applymap(ray_df, pandas_df, testfunc) + + test_copy(ray_df) + test_sum(ray_df, pandas_df) + test_keys(ray_df, pandas_df) + test_transpose(ray_df, pandas_df) + + for key in keys: + test_get(ray_df, pandas_df, key) + + test_get_dtype_counts(ray_df, pandas_df) + test_get_ftype_counts(ray_df, pandas_df) + + def test_add(): ray_df = create_test_dataframe()