diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index f01f9f3dc..f1922dbd4 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -5,6 +5,7 @@ from __future__ import print_function import pandas as pd import numpy as np import ray +import itertools class DataFrame(object): @@ -720,17 +721,75 @@ class DataFrame(object): limit_direction='forward', downcast=None, **kwargs): raise NotImplementedError("Not Yet implemented.") + def iterrows(self): + """Iterate over DataFrame rows as (index, Series) pairs. + + Note: + Generators can't be pickeled so from the remote function + we expand the generator into a list before getting it. + This is not that ideal. + + Returns: + A generator that iterates over the rows of the frame. + """ + iters = ray.get([ + _deploy_func.remote( + lambda df: list(df.iterrows()), part) for part in self._df]) + return itertools.chain.from_iterable(iters) + def items(self): - raise NotImplementedError("Not Yet implemented.") + """Iterator over (column name, Series) pairs. + + Note: + Generators can't be pickeled so from the remote function + we expand the generator into a list before getting it. + This is not that ideal. + + Returns: + A generator that iterates over the columns of the frame. + """ + iters = ray.get([_deploy_func.remote( + lambda df: list(df.items()), part) for part in self._df]) + + def concat_iters(iterables): + for partitions in zip(*iterables): + series = pd.concat([_series for _, _series in partitions]) + yield (series.name, series) + + return concat_iters(iters) def iteritems(self): - raise NotImplementedError("Not Yet implemented.") + """Iterator over (column name, Series) pairs. - def iterrows(self): - raise NotImplementedError("Not Yet implemented.") + Note: + Returns the same thing as .items() + + Returns: + A generator that iterates over the columns of the frame. + """ + return self.items() def itertuples(self, index=True, name='Pandas'): - raise NotImplementedError("Not Yet implemented.") + """Iterate over DataFrame rows as namedtuples. + + Args: + index (boolean, default True): If True, return the index as the + first element of the tuple. + name (string, default "Pandas"): The name of the returned + namedtuples or None to return regular tuples. + Note: + Generators can't be pickeled so from the remote function + we expand the generator into a list before getting it. + This is not that ideal. + + Returns: + A tuple representing row data. See args for varying tuples. + """ + iters = ray.get([ + _deploy_func.remote( + lambda df: list(df.itertuples(index=index, name=name)), + part) for part in self._df]) + return itertools.chain.from_iterable(iters) def join(self, other, on=None, how='left', lsuffix='', rsuffix='', sort=False): diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 266a1954a..19462a702 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -202,6 +202,10 @@ def test_int_dataframe(): test_get_dtype_counts(ray_df, pandas_df) test_get_ftype_counts(ray_df, pandas_df) + test_iterrows(ray_df, pandas_df) + test_items(ray_df, pandas_df) + test_iteritems(ray_df, pandas_df) + test_itertuples(ray_df, pandas_df) test_max(ray_df, pandas_df) test_min(ray_df, pandas_df) @@ -274,6 +278,10 @@ def test_float_dataframe(): test_get_dtype_counts(ray_df, pandas_df) test_get_ftype_counts(ray_df, pandas_df) + test_iterrows(ray_df, pandas_df) + test_items(ray_df, pandas_df) + test_iteritems(ray_df, pandas_df) + test_itertuples(ray_df, pandas_df) def test_mixed_dtype_dataframe(): @@ -319,6 +327,11 @@ def test_mixed_dtype_dataframe(): test_get_dtype_counts(ray_df, pandas_df) test_get_ftype_counts(ray_df, pandas_df) + test_items(ray_df, pandas_df) + test_iterrows(ray_df, pandas_df) + test_items(ray_df, pandas_df) + test_iteritems(ray_df, pandas_df) + test_itertuples(ray_df, pandas_df) test_max(ray_df, pandas_df) test_min(ray_df, pandas_df) @@ -806,32 +819,57 @@ def test_interpolate(): ray_df.interpolate() -def test_items(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.items() +@pytest.fixture +def test_items(ray_df, pandas_df): + ray_items = ray_df.items() + pandas_items = pandas_df.items() + for ray_item, pandas_item in zip(ray_items, pandas_items): + ray_index, ray_series = ray_item + pandas_index, pandas_series = pandas_item + assert pandas_series.equals(ray_series) + assert pandas_index == ray_index -def test_iteritems(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.iteritems() +@pytest.fixture +def test_iteritems(ray_df, pandas_df): + ray_items = ray_df.iteritems() + pandas_items = pandas_df.iteritems() + for ray_item, pandas_item in zip(ray_items, pandas_items): + ray_index, ray_series = ray_item + pandas_index, pandas_series = pandas_item + assert pandas_series.equals(ray_series) + assert pandas_index == ray_index -def test_iterrows(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.iterrows() +@pytest.fixture +def test_iterrows(ray_df, pandas_df): + ray_iterrows = ray_df.iterrows() + pandas_iterrows = pandas_df.iterrows() + for ray_row, pandas_row in zip(ray_iterrows, pandas_iterrows): + ray_index, ray_series = ray_row + pandas_index, pandas_series = pandas_row + assert pandas_series.equals(ray_series) + assert pandas_index == ray_index -def test_itertuples(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_itertuples(ray_df, pandas_df): + # test default + ray_it_default = ray_df.itertuples() + pandas_it_default = pandas_df.itertuples() + for ray_row, pandas_row in zip(ray_it_default, pandas_it_default): + assert ray_row == pandas_row - with pytest.raises(NotImplementedError): - ray_df.itertuples() + # test all combinations of custom params + indices = [True, False] + names = [None, 'NotPandas', 'Pandas'] + + for index in indices: + for name in names: + ray_it_custom = ray_df.itertuples(index=index, name=name) + pandas_it_custom = pandas_df.itertuples(index=index, name=name) + for ray_row, pandas_row in zip(ray_it_custom, pandas_it_custom): + assert ray_row == pandas_row def test_join():