From fa37564511d2b29e27757bd4c07db2d114c34389 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Wed, 7 Feb 2018 15:43:45 -0800 Subject: [PATCH] [DataFrame] Implementation for head, idxmax, idxmin, pop, tail, and Ray Index (#1520) * Adding head implementation * Adding idxmax, idxmin, pop, tail * Adding index skeleton * Addressing reviewer comments * Fixing tests to reflect Series constructor changes --- python/ray/dataframe/dataframe.py | 94 +++++++++++++++++++-- python/ray/dataframe/index.py | 21 +++++ python/ray/dataframe/series.py | 8 ++ python/ray/dataframe/test/test_dataframe.py | 57 +++++++------ python/ray/dataframe/test/test_series.py | 2 +- 5 files changed, 151 insertions(+), 31 deletions(-) create mode 100644 python/ray/dataframe/index.py diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 3089081ec..a8f61a366 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -614,7 +614,28 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def head(self, n=5): - raise NotImplementedError("Not Yet implemented.") + """Get the first n rows of the dataframe. + + Args: + n (int): The number of rows to return. + + Returns: + A new dataframe with the first n rows of the dataframe. + """ + sizes = ray.get(self._map_partitions(lambda df: df.size)._df) + new_dfs = [] + i = 0 + while n > 0 and i < len(self._df): + if (n - sizes[i]) < 0: + new_dfs.append(_deploy_func.remote(lambda df: df.head(n), + self._df[i])) + break + else: + new_dfs.append(self._df[i]) + n -= sizes[i] + i += 1 + + return DataFrame(new_dfs, self.columns) def hist(self, data, column=None, by=None, grid=True, xlabelsize=None, xrot=None, ylabelsize=None, yrot=None, ax=None, sharex=False, @@ -622,10 +643,38 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def idxmax(self, axis=0, skipna=True): - raise NotImplementedError("Not Yet implemented.") + """Get the index of the first occurrence of the max value of the axis. + + Args: + axis (int): Identify the max over the rows (1) or columns (0). + skipna (bool): Whether or not to skip NA values. + + Returns: + A Series with the index for each maximum value for the axis + specified. + """ + if axis == 1: + return to_pandas(self._map_partitions( + lambda df: df.idxmax(axis=axis, skipna=skipna))) + else: + return self.T.idxmax(axis=1, skipna=skipna) def idxmin(self, axis=0, skipna=True): - raise NotImplementedError("Not Yet implemented.") + """Get the index of the first occurrence of the min value of the axis. + + Args: + axis (int): Identify the min over the rows (1) or columns (0). + skipna (bool): Whether or not to skip NA values. + + Returns: + A Series with the index for each minimum value for the axis + specified. + """ + if axis == 1: + return to_pandas(self._map_partitions( + lambda df: df.idxmin(axis=axis, skipna=skipna))) + else: + return self.T.idxmin(axis=1, skipna=skipna) def infer_objects(self): raise NotImplementedError("Not Yet implemented.") @@ -771,7 +820,20 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def pop(self, item): - raise NotImplementedError("Not Yet implemented.") + """Pops an item from this DataFrame and returns it. + + Args: + item (str): Column label to be popped + + Returns: + A Series containing the popped values. Also modifies this + DataFrame. + """ + + popped = to_pandas(self._map_partitions( + lambda df: df.pop(item))) + self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df + return popped def pow(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -934,7 +996,29 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def tail(self, n=5): - raise NotImplementedError("Not Yet implemented.") + """Get the last n rows of the dataframe. + + Args: + n (int): The number of rows to return. + + Returns: + A new dataframe with the last n rows of this dataframe. + """ + sizes = ray.get(self._map_partitions(lambda df: df.size)._df) + new_dfs = [] + i = len(self._df) - 1 + while n > 0 and i >= 0: + if (n - sizes[i]) < 0: + new_dfs.append(_deploy_func.remote(lambda df: df.head(n), + self._df[i])) + break + else: + new_dfs.append(self._df[i]) + n -= sizes[i] + i -= 1 + # we were adding in reverse order, so make it right. + new_dfs.reverse() + return DataFrame(new_dfs, self.columns) def take(self, indices, axis=0, convert=None, is_copy=True, **kwargs): raise NotImplementedError("Not Yet implemented.") diff --git a/python/ray/dataframe/index.py b/python/ray/dataframe/index.py new file mode 100644 index 000000000..790c53f7e --- /dev/null +++ b/python/ray/dataframe/index.py @@ -0,0 +1,21 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pandas as pd + + +class Index(object): + + def __init__(self, idx): + self.idx = idx + + @classmethod + def to_pandas(indices): + if isinstance(indices[0], pd.RangeIndex): + merged = indices[0] + for index in indices[1:]: + merged = merged.union(index) + return merged + else: + return indices[0].append(indices[1:]) diff --git a/python/ray/dataframe/series.py b/python/ray/dataframe/series.py index 91ac792f2..eea7f7bbb 100644 --- a/python/ray/dataframe/series.py +++ b/python/ray/dataframe/series.py @@ -13,6 +13,14 @@ def na_op(): class Series(object): + def __init__(self, series_oids): + """Constructor for a Series object. + + Args: + series_oids ([ObjectID]): The list of remote Series objects. + """ + self.series_oids = series_oids + @property def T(self): raise NotImplementedError("Not Yet implemented.") diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index d0c168a1e..e4ab722cb 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -165,6 +165,11 @@ def test_int_dataframe(): test___deepcopy__(ray_df, pandas_df) test_bool(ray_df, pandas_df) test_count(ray_df, pandas_df) + test_head(ray_df, pandas_df) + test_tail(ray_df, pandas_df) + test_idxmax(ray_df, pandas_df) + test_idxmin(ray_df, pandas_df) + test_pop(ray_df, pandas_df) def test_float_dataframe(): @@ -212,6 +217,11 @@ def test_float_dataframe(): test___deepcopy__(ray_df, pandas_df) test_bool(ray_df, pandas_df) test_count(ray_df, pandas_df) + test_head(ray_df, pandas_df) + test_tail(ray_df, pandas_df) + test_idxmax(ray_df, pandas_df) + test_idxmin(ray_df, pandas_df) + test_pop(ray_df, pandas_df) def test_add(): @@ -663,11 +673,9 @@ def test_gt(): ray_df.gt(None) -def test_head(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.head() +@pytest.fixture +def test_head(ray_df, pandas_df): + ray_df_equals_pandas(ray_df.head(), pandas_df.head()) def test_hist(): @@ -677,18 +685,16 @@ def test_hist(): ray_df.hist(None) -def test_idxmax(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.idxmax() +@pytest.fixture +def test_idxmax(ray_df, pandas_df): + assert \ + ray_df.idxmax().sort_index().equals(pandas_df.idxmax().sort_index()) -def test_idxmin(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.idxmin() +@pytest.fixture +def test_idxmin(ray_df, pandas_df): + assert \ + ray_df.idxmin().sort_index().equals(pandas_df.idxmin().sort_index()) def test_infer_objects(): @@ -971,11 +977,14 @@ def test_plot(): ray_df.plot() -def test_pop(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.pop(None) +@pytest.fixture +def test_pop(ray_df, pandas_df): + temp_ray_df = ray_df._map_partitions(lambda df: df) + temp_pandas_df = pandas_df.copy() + ray_popped = temp_ray_df.pop('col2') + pandas_popped = temp_pandas_df.pop('col2') + assert ray_popped.sort_index().equals(pandas_popped.sort_index()) + ray_df_equals_pandas(temp_ray_df, temp_pandas_df) def test_pow(): @@ -1292,11 +1301,9 @@ def test_swaplevel(): ray_df.swaplevel() -def test_tail(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.tail() +@pytest.fixture +def test_tail(ray_df, pandas_df): + ray_df_equals_pandas(ray_df.tail(), pandas_df.tail()) def test_take(): diff --git a/python/ray/dataframe/test/test_series.py b/python/ray/dataframe/test/test_series.py index 7c8b33d8e..7db0d4d35 100644 --- a/python/ray/dataframe/test/test_series.py +++ b/python/ray/dataframe/test/test_series.py @@ -11,7 +11,7 @@ ray.init() @pytest.fixture def create_test_series(): - return rdf.Series() + return rdf.Series(None) def test_T():