mirror of
https://github.com/wassname/ray.git
synced 2026-06-29 01:59:23 +08:00
[DataFrame] Implement iteritems, items, itertuples, and iterrows. (#1543)
* items * Can't pickle generator so return list * Add itterows method * Finish flak8 * Add itertuples * Some changes * Add iter tests to mixed types test * Appease flake8
This commit is contained in:
committed by
Devin Petersohn
parent
1cd2703cac
commit
fd03fb967f
@@ -5,6 +5,7 @@ from __future__ import print_function
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import ray
|
||||
import itertools
|
||||
|
||||
|
||||
class DataFrame(object):
|
||||
@@ -720,17 +721,75 @@ class DataFrame(object):
|
||||
limit_direction='forward', downcast=None, **kwargs):
|
||||
raise NotImplementedError("Not Yet implemented.")
|
||||
|
||||
def iterrows(self):
|
||||
"""Iterate over DataFrame rows as (index, Series) pairs.
|
||||
|
||||
Note:
|
||||
Generators can't be pickeled so from the remote function
|
||||
we expand the generator into a list before getting it.
|
||||
This is not that ideal.
|
||||
|
||||
Returns:
|
||||
A generator that iterates over the rows of the frame.
|
||||
"""
|
||||
iters = ray.get([
|
||||
_deploy_func.remote(
|
||||
lambda df: list(df.iterrows()), part) for part in self._df])
|
||||
return itertools.chain.from_iterable(iters)
|
||||
|
||||
def items(self):
|
||||
raise NotImplementedError("Not Yet implemented.")
|
||||
"""Iterator over (column name, Series) pairs.
|
||||
|
||||
Note:
|
||||
Generators can't be pickeled so from the remote function
|
||||
we expand the generator into a list before getting it.
|
||||
This is not that ideal.
|
||||
|
||||
Returns:
|
||||
A generator that iterates over the columns of the frame.
|
||||
"""
|
||||
iters = ray.get([_deploy_func.remote(
|
||||
lambda df: list(df.items()), part) for part in self._df])
|
||||
|
||||
def concat_iters(iterables):
|
||||
for partitions in zip(*iterables):
|
||||
series = pd.concat([_series for _, _series in partitions])
|
||||
yield (series.name, series)
|
||||
|
||||
return concat_iters(iters)
|
||||
|
||||
def iteritems(self):
|
||||
raise NotImplementedError("Not Yet implemented.")
|
||||
"""Iterator over (column name, Series) pairs.
|
||||
|
||||
def iterrows(self):
|
||||
raise NotImplementedError("Not Yet implemented.")
|
||||
Note:
|
||||
Returns the same thing as .items()
|
||||
|
||||
Returns:
|
||||
A generator that iterates over the columns of the frame.
|
||||
"""
|
||||
return self.items()
|
||||
|
||||
def itertuples(self, index=True, name='Pandas'):
|
||||
raise NotImplementedError("Not Yet implemented.")
|
||||
"""Iterate over DataFrame rows as namedtuples.
|
||||
|
||||
Args:
|
||||
index (boolean, default True): If True, return the index as the
|
||||
first element of the tuple.
|
||||
name (string, default "Pandas"): The name of the returned
|
||||
namedtuples or None to return regular tuples.
|
||||
Note:
|
||||
Generators can't be pickeled so from the remote function
|
||||
we expand the generator into a list before getting it.
|
||||
This is not that ideal.
|
||||
|
||||
Returns:
|
||||
A tuple representing row data. See args for varying tuples.
|
||||
"""
|
||||
iters = ray.get([
|
||||
_deploy_func.remote(
|
||||
lambda df: list(df.itertuples(index=index, name=name)),
|
||||
part) for part in self._df])
|
||||
return itertools.chain.from_iterable(iters)
|
||||
|
||||
def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
|
||||
sort=False):
|
||||
|
||||
@@ -202,6 +202,10 @@ def test_int_dataframe():
|
||||
|
||||
test_get_dtype_counts(ray_df, pandas_df)
|
||||
test_get_ftype_counts(ray_df, pandas_df)
|
||||
test_iterrows(ray_df, pandas_df)
|
||||
test_items(ray_df, pandas_df)
|
||||
test_iteritems(ray_df, pandas_df)
|
||||
test_itertuples(ray_df, pandas_df)
|
||||
|
||||
test_max(ray_df, pandas_df)
|
||||
test_min(ray_df, pandas_df)
|
||||
@@ -274,6 +278,10 @@ def test_float_dataframe():
|
||||
|
||||
test_get_dtype_counts(ray_df, pandas_df)
|
||||
test_get_ftype_counts(ray_df, pandas_df)
|
||||
test_iterrows(ray_df, pandas_df)
|
||||
test_items(ray_df, pandas_df)
|
||||
test_iteritems(ray_df, pandas_df)
|
||||
test_itertuples(ray_df, pandas_df)
|
||||
|
||||
|
||||
def test_mixed_dtype_dataframe():
|
||||
@@ -319,6 +327,11 @@ def test_mixed_dtype_dataframe():
|
||||
|
||||
test_get_dtype_counts(ray_df, pandas_df)
|
||||
test_get_ftype_counts(ray_df, pandas_df)
|
||||
test_items(ray_df, pandas_df)
|
||||
test_iterrows(ray_df, pandas_df)
|
||||
test_items(ray_df, pandas_df)
|
||||
test_iteritems(ray_df, pandas_df)
|
||||
test_itertuples(ray_df, pandas_df)
|
||||
|
||||
test_max(ray_df, pandas_df)
|
||||
test_min(ray_df, pandas_df)
|
||||
@@ -806,32 +819,57 @@ def test_interpolate():
|
||||
ray_df.interpolate()
|
||||
|
||||
|
||||
def test_items():
|
||||
ray_df = create_test_dataframe()
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
ray_df.items()
|
||||
@pytest.fixture
|
||||
def test_items(ray_df, pandas_df):
|
||||
ray_items = ray_df.items()
|
||||
pandas_items = pandas_df.items()
|
||||
for ray_item, pandas_item in zip(ray_items, pandas_items):
|
||||
ray_index, ray_series = ray_item
|
||||
pandas_index, pandas_series = pandas_item
|
||||
assert pandas_series.equals(ray_series)
|
||||
assert pandas_index == ray_index
|
||||
|
||||
|
||||
def test_iteritems():
|
||||
ray_df = create_test_dataframe()
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
ray_df.iteritems()
|
||||
@pytest.fixture
|
||||
def test_iteritems(ray_df, pandas_df):
|
||||
ray_items = ray_df.iteritems()
|
||||
pandas_items = pandas_df.iteritems()
|
||||
for ray_item, pandas_item in zip(ray_items, pandas_items):
|
||||
ray_index, ray_series = ray_item
|
||||
pandas_index, pandas_series = pandas_item
|
||||
assert pandas_series.equals(ray_series)
|
||||
assert pandas_index == ray_index
|
||||
|
||||
|
||||
def test_iterrows():
|
||||
ray_df = create_test_dataframe()
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
ray_df.iterrows()
|
||||
@pytest.fixture
|
||||
def test_iterrows(ray_df, pandas_df):
|
||||
ray_iterrows = ray_df.iterrows()
|
||||
pandas_iterrows = pandas_df.iterrows()
|
||||
for ray_row, pandas_row in zip(ray_iterrows, pandas_iterrows):
|
||||
ray_index, ray_series = ray_row
|
||||
pandas_index, pandas_series = pandas_row
|
||||
assert pandas_series.equals(ray_series)
|
||||
assert pandas_index == ray_index
|
||||
|
||||
|
||||
def test_itertuples():
|
||||
ray_df = create_test_dataframe()
|
||||
@pytest.fixture
|
||||
def test_itertuples(ray_df, pandas_df):
|
||||
# test default
|
||||
ray_it_default = ray_df.itertuples()
|
||||
pandas_it_default = pandas_df.itertuples()
|
||||
for ray_row, pandas_row in zip(ray_it_default, pandas_it_default):
|
||||
assert ray_row == pandas_row
|
||||
|
||||
with pytest.raises(NotImplementedError):
|
||||
ray_df.itertuples()
|
||||
# test all combinations of custom params
|
||||
indices = [True, False]
|
||||
names = [None, 'NotPandas', 'Pandas']
|
||||
|
||||
for index in indices:
|
||||
for name in names:
|
||||
ray_it_custom = ray_df.itertuples(index=index, name=name)
|
||||
pandas_it_custom = pandas_df.itertuples(index=index, name=name)
|
||||
for ray_row, pandas_row in zip(ray_it_custom, pandas_it_custom):
|
||||
assert ray_row == pandas_row
|
||||
|
||||
|
||||
def test_join():
|
||||
|
||||
Reference in New Issue
Block a user