From 8c1066cdbaa4203e68f9ceea2590f8ac0df287be Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Tue, 13 Mar 2018 10:06:34 -0700 Subject: [PATCH] [DataFrame] Implemented cummax, cummin, cumsum, cumprod (#1705) * cummax, cummin, cumsum, cumprod * added remote function * Fix lint * Fixing tests and linting * Fix lint --- python/ray/dataframe/dataframe.py | 134 +++++++++++++++++--- python/ray/dataframe/test/test_dataframe.py | 70 +++++----- python/ray/dataframe/utils.py | 7 + 3 files changed, 165 insertions(+), 46 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index dd4dd11eb..bd9f34e43 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -24,7 +24,8 @@ from .utils import ( _shuffle, _local_groupby, _deploy_func, - _compute_length_and_index) + _compute_length_and_index, + _prepend_partitions) class DataFrame(object): @@ -698,24 +699,127 @@ class DataFrame(object): "github.com/ray-project/ray.") def cummax(self, axis=None, skipna=True, *args, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Perform a cumulative maximum across the DataFrame. + + Args: + axis (int): The axis to take maximum on. + skipna (bool): True to skip NA values, false otherwise. + + Returns: + The cumulative maximum of the DataFrame. + """ + if axis == 1: + return self._map_partitions( + lambda df: df.cummax(axis=axis, skipna=skipna, + *args, **kwargs)) + else: + local_max = [_deploy_func.remote( + lambda df: pd.DataFrame(df.max()).T, self._df[i]) + for i in range(len(self._df))] + new_df = DataFrame(local_max, self.columns) + last_row_df = pd.DataFrame([df.iloc[-1, :] + for df in ray.get(new_df._df)]) + cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i], + lambda df: + df.cummax(axis=axis, + skipna=skipna, + *args, **kwargs)) + for i in range(len(self._df))] + final_df = DataFrame(cum_df, self.columns) + return final_df def cummin(self, axis=None, skipna=True, *args, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Perform a cumulative minimum across the DataFrame. + + Args: + axis (int): The axis to cummin on. + skipna (bool): True to skip NA values, false otherwise. + + Returns: + The cumulative minimum of the DataFrame. + """ + if axis == 1: + return self._map_partitions( + lambda df: df.cummin(axis=axis, skipna=skipna, + *args, **kwargs)) + else: + local_min = [_deploy_func.remote( + lambda df: pd.DataFrame(df.min()).T, self._df[i]) + for i in range(len(self._df))] + new_df = DataFrame(local_min, self.columns) + last_row_df = pd.DataFrame([df.iloc[-1, :] + for df in ray.get(new_df._df)]) + cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i], + lambda df: + df.cummin(axis=axis, + skipna=skipna, + *args, **kwargs)) + for i in range(len(self._df))] + final_df = DataFrame(cum_df, self.columns) + return final_df def cumprod(self, axis=None, skipna=True, *args, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Perform a cumulative product across the DataFrame. + + Args: + axis (int): The axis to take product on. + skipna (bool): True to skip NA values, false otherwise. + + Returns: + The cumulative product of the DataFrame. + """ + if axis == 1: + return self._map_partitions( + lambda df: df.cumprod(axis=axis, skipna=skipna, + *args, **kwargs)) + else: + local_prod = [_deploy_func.remote( + lambda df: pd.DataFrame(df.prod()).T, self._df[i]) + for i in range(len(self._df))] + new_df = DataFrame(local_prod, self.columns) + last_row_df = pd.DataFrame([df.iloc[-1, :] + for df in ray.get(new_df._df)]) + cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i], + lambda df: + df.cumprod(axis=axis, + skipna=skipna, + *args, **kwargs)) + for i in range(len(self._df))] + final_df = DataFrame(cum_df, self.columns) + return final_df def cumsum(self, axis=None, skipna=True, *args, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Perform a cumulative sum across the DataFrame. + + Args: + axis (int): The axis to take sum on. + skipna (bool): True to skip NA values, false otherwise. + + Returns: + The cumulative sum of the DataFrame. + """ + if axis == 1: + return self._map_partitions( + lambda df: df.cumsum(axis=axis, skipna=skipna, + *args, **kwargs)) + else: + # first take the sum of each partition, + # append the sums of all previous partitions to current partition + # take cumsum and remove the appended rows + local_sum = [_deploy_func.remote( + lambda df: pd.DataFrame(df.sum()).T, self._df[i]) + for i in range(len(self._df))] + new_df = DataFrame(local_sum, self.columns) + last_row_df = pd.DataFrame([df.iloc[-1, :] + for df in ray.get(new_df._df)]) + cum_df = [_prepend_partitions.remote(last_row_df, i, self._df[i], + lambda df: + df.cumsum(axis=axis, + skipna=skipna, + *args, **kwargs)) + for i in range(len(self._df))] + final_df = DataFrame(cum_df, self.columns) + return final_df def describe(self, percentiles=None, include=None, exclude=None): raise NotImplementedError( @@ -1503,7 +1607,7 @@ class DataFrame(object): Returns: The max of the DataFrame. """ - if(axis == 1): + if axis == 1: return self._map_partitions( lambda df: df.max(axis=axis, skipna=skipna, level=level, numeric_only=numeric_only, **kwargs)) @@ -1553,7 +1657,7 @@ class DataFrame(object): Returns: The min of the DataFrame. """ - if(axis == 1): + if axis == 1: return self._map_partitions( lambda df: df.min(axis=axis, skipna=skipna, level=level, numeric_only=numeric_only, **kwargs)) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 80165eae9..a9bad313e 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -238,6 +238,10 @@ def test_int_dataframe(): test_min(ray_df, pandas_df) test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_cummax(ray_df, pandas_df) + test_cummin(ray_df, pandas_df) + test_cumprod(ray_df, pandas_df) + test_cumsum(ray_df, pandas_df) test_loc(ray_df, pandas_df) test_iloc(ray_df, pandas_df) @@ -333,6 +337,10 @@ def test_float_dataframe(): test_min(ray_df, pandas_df) test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_cummax(ray_df, pandas_df) + test_cummin(ray_df, pandas_df) + test_cumprod(ray_df, pandas_df) + test_cumsum(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df) @@ -451,6 +459,10 @@ def test_mixed_dtype_dataframe(): test_min(ray_df, pandas_df) test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_cummax(ray_df, pandas_df) + test_cummin(ray_df, pandas_df) + # test_cumprod(ray_df, pandas_df) + test_cumsum(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df) @@ -558,6 +570,10 @@ def test_nan_dataframe(): test_min(ray_df, pandas_df) test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_cummax(ray_df, pandas_df) + test_cummin(ray_df, pandas_df) + test_cumprod(ray_df, pandas_df) + test_cumsum(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df) @@ -824,32 +840,24 @@ def test_cov(): ray_df.cov() -def test_cummax(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.cummax() +@pytest.fixture +def test_cummax(ray_df, pandas_df): + assert(ray_df_equals_pandas(ray_df.cummax(), pandas_df.cummax())) -def test_cummin(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.cummin() +@pytest.fixture +def test_cummin(ray_df, pandas_df): + assert(ray_df_equals_pandas(ray_df.cummin(), pandas_df.cummin())) -def test_cumprod(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.cumprod() +@pytest.fixture +def test_cumprod(ray_df, pandas_df): + assert(ray_df_equals_pandas(ray_df.cumprod(), pandas_df.cumprod())) -def test_cumsum(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.cumsum() +@pytest.fixture +def test_cumsum(ray_df, pandas_df): + assert(ray_df_equals_pandas(ray_df.cumsum(), pandas_df.cumsum())) def test_describe(): @@ -1897,7 +1905,7 @@ def test_prod(): ray_df = create_test_dataframe() with pytest.raises(NotImplementedError): - ray_df.prod() + ray_df.prod(None) def test_product(): @@ -1985,7 +1993,7 @@ def test_rename_sanity(num_partitions=2): 'D': 'd' } - ray_df = rdf.from_pandas(test_data.frame, num_partitions) + ray_df = from_pandas(test_data.frame, num_partitions) assert ray_df_equals_pandas( ray_df.rename(columns=mapping), test_data.frame.rename(columns=mapping) @@ -1997,7 +2005,7 @@ def test_rename_sanity(num_partitions=2): renamed2 ) - ray_df = rdf.from_pandas(renamed2, num_partitions) + ray_df = from_pandas(renamed2, num_partitions) assert ray_df_equals_pandas( ray_df.rename(columns=str.upper), renamed2.rename(columns=str.upper) @@ -2010,7 +2018,7 @@ def test_rename_sanity(num_partitions=2): # gets sorted alphabetical df = pd.DataFrame(data) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) tm.assert_index_equal( ray_df.rename(index={'foo': 'bar', 'bar': 'foo'}).index, df.rename(index={'foo': 'bar', 'bar': 'foo'}).index @@ -2026,7 +2034,7 @@ def test_rename_sanity(num_partitions=2): # partial columns renamed = test_data.frame.rename(columns={'C': 'foo', 'D': 'bar'}) - ray_df = rdf.from_pandas(test_data.frame, num_partitions) + ray_df = from_pandas(test_data.frame, num_partitions) tm.assert_index_equal( ray_df.rename(columns={'C': 'foo', 'D': 'bar'}).index, test_data.frame.rename(columns={'C': 'foo', 'D': 'bar'}).index @@ -2044,7 +2052,7 @@ def test_rename_sanity(num_partitions=2): index = pd.Index(['foo', 'bar'], name='name') renamer = pd.DataFrame(data, index=index) - ray_df = rdf.from_pandas(renamer, num_partitions) + ray_df = from_pandas(renamer, num_partitions) renamed = renamer.rename(index={'foo': 'bar', 'bar': 'foo'}) ray_renamed = ray_df.rename(index={'foo': 'bar', 'bar': 'foo'}) tm.assert_index_equal( @@ -2062,7 +2070,7 @@ def test_rename_multiindex(num_partitions=2): columns = pd.MultiIndex.from_tuples( tuples_columns, names=['fizz', 'buzz']) df = pd.DataFrame([(0, 0), (1, 1)], index=index, columns=columns) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) # # without specifying level -> accross all levels @@ -2133,7 +2141,7 @@ def test_rename_multiindex(num_partitions=2): @pytest.fixture def test_rename_nocopy(num_partitions=2): test_data = TestData().frame - ray_df = rdf.from_pandas(test_data, num_partitions) + ray_df = from_pandas(test_data, num_partitions) ray_renamed = ray_df.rename(columns={'C': 'foo'}, copy=False) ray_renamed['foo'] = 1 assert (ray_df['C'] == 1).all() @@ -2142,7 +2150,7 @@ def test_rename_nocopy(num_partitions=2): @pytest.fixture def test_rename_inplace(num_partitions=2): test_data = TestData().frame - ray_df = rdf.from_pandas(test_data, num_partitions) + ray_df = from_pandas(test_data, num_partitions) assert ray_df_equals_pandas( ray_df.rename(columns={'C': 'foo'}), @@ -2165,7 +2173,7 @@ def test_rename_bug(num_partitions=2): # GH 5344 # rename set ref_locs, and set_index was not resetting df = pd.DataFrame({0: ['foo', 'bar'], 1: ['bah', 'bas'], 2: [1, 2]}) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) df = df.rename(columns={0: 'a'}) df = df.rename(columns={1: 'b'}) # TODO: Uncomment when set_index is implemented @@ -2191,7 +2199,7 @@ def test_rename_axis(): @pytest.fixture def test_rename_axis_inplace(num_partitions=2): test_frame = TestData().frame - ray_df = rdf.from_pandas(test_frame, num_partitions) + ray_df = from_pandas(test_frame, num_partitions) # GH 15704 result = test_frame.copy() diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 777f1033b..fc9f40279 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -155,3 +155,10 @@ def _compute_length_and_index(dfs): for j in range(lengths[i])]} return lengths, pd.DataFrame(dest_indices) + + +@ray.remote +def _prepend_partitions(last_vals, index, partition, func): + appended_df = last_vals[:index].append(partition) + cum_df = func(appended_df) + return cum_df[index:]