From 22d4950fae8022cb6ef92df2d4549742fb65fc60 Mon Sep 17 00:00:00 2001 From: Peter Veerman Date: Fri, 4 May 2018 10:16:05 -0700 Subject: [PATCH] [DataFrame] Implements df.pipe (#1999) * Add empty df test * Fix flake8 issues * rebase with master * reset master tests * Implement df.pipe * fix tests * Use test_pipe as a pytest.fixture * Add newline at EOF --- python/ray/dataframe/dataframe.py | 14 +++++++-- python/ray/dataframe/test/test_dataframe.py | 35 ++++++++++++++++++--- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index e9763fb70..98305f42a 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -2783,9 +2783,17 @@ class DataFrame(object): "github.com/ray-project/ray.") def pipe(self, func, *args, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Apply func(self, *args, **kwargs) + + Args: + func: function to apply to the df. + args: positional arguments passed into ``func``. + kwargs: a dictionary of keyword arguments passed into ``func``. + + Returns: + object: the return type of ``func``. + """ + return com._pipe(self, func, *args, **kwargs) def pivot(self, index=None, columns=None, values=None): raise NotImplementedError( diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 47468a680..42f267db3 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -262,6 +262,7 @@ def test_int_dataframe(): test_cummin(ray_df, pandas_df) test_cumprod(ray_df, pandas_df) test_cumsum(ray_df, pandas_df) + test_pipe(ray_df, pandas_df) # test_loc(ray_df, pandas_df) # test_iloc(ray_df, pandas_df) @@ -405,6 +406,7 @@ def test_float_dataframe(): test_cummin(ray_df, pandas_df) test_cumprod(ray_df, pandas_df) test_cumsum(ray_df, pandas_df) + test_pipe(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df) @@ -568,6 +570,7 @@ def test_mixed_dtype_dataframe(): test_min(ray_df, pandas_df) test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_pipe(ray_df, pandas_df) # TODO Fix pandas so that the behavior is correct # We discovered a bug where argmax does not always give the same result @@ -718,6 +721,7 @@ def test_nan_dataframe(): test_cummin(ray_df, pandas_df) test_cumprod(ray_df, pandas_df) test_cumsum(ray_df, pandas_df) + test_pipe(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df) @@ -2151,11 +2155,34 @@ def test_pct_change(): ray_df.pct_change() -def test_pipe(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_pipe(ray_df, pandas_df): + n = len(ray_df.index) + a, b, c = 2 % n, 0, 3 % n + col = ray_df.columns[3 % len(ray_df.columns)] - with pytest.raises(NotImplementedError): - ray_df.pipe(None) + def h(x): + return x.drop(columns=[col]) + + def g(x, arg1=0): + for _ in range(arg1): + x = x.append(x) + return x + + def f(x, arg2=0, arg3=0): + return x.drop([arg2, arg3]) + + assert ray_df_equals(f(g(h(ray_df), arg1=a), arg2=b, arg3=c), + (ray_df.pipe(h) + .pipe(g, arg1=a) + .pipe(f, arg2=b, arg3=c))) + + assert ray_df_equals_pandas((ray_df.pipe(h) + .pipe(g, arg1=a) + .pipe(f, arg2=b, arg3=c)), + (pandas_df.pipe(h) + .pipe(g, arg1=a) + .pipe(f, arg2=b, arg3=c))) def test_pivot():