[DataFrame] Apply() for Lists and Dicts (#1973)

* working for non-string functions and not lists of functions

* works with functions as strings now as well

* fixed linting errors

* throwing a warning if the input is a dictionary

* added dict of lists functionality

* fix minor indexing errors and lint

* removed some commented out code

* some comments and thoughts for apply

* cleaned up code a little bit and added todos

* improved performance

* error checking and code cleanup and comments

* small change

* improved list performance a lot

* agg calls apply for lists

* addressing comments on the PR

* col_metadata change

* updated tests to expect TypeError where appropriate
This commit is contained in:
Omkar Salpekar
2018-05-04 10:05:00 -07:00
committed by Devin Petersohn
parent cdf94c18a4
commit a1d7bb31a4
2 changed files with 82 additions and 69 deletions
+47 -22
View File
@@ -827,16 +827,7 @@ class DataFrame(object):
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
elif is_list_like(arg):
from .concat import concat
x = [self._aggregate(func, *args, **kwargs)
for func in arg]
new_dfs = [x[i] if not isinstance(x[i], pd.Series)
else pd.DataFrame(x[i], columns=[arg[i]]).T
for i in range(len(x))]
return concat(new_dfs)
return self.apply(arg, axis=_axis, args=args, **kwargs)
elif callable(arg):
self._callable_function(arg, _axis, *args, **kwargs)
else:
@@ -1047,24 +1038,58 @@ class DataFrame(object):
"""
axis = pd.DataFrame()._get_axis_number(axis)
if is_list_like(func) and not all([isinstance(obj, str)
for obj in func]):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if axis == 0 and is_list_like(func):
return self.aggregate(func, axis, *args, **kwds)
if isinstance(func, compat.string_types):
if axis == 1:
kwds['axis'] = axis
return getattr(self, func)(*args, **kwds)
elif isinstance(func, dict):
if axis == 1:
raise TypeError(
"(\"'dict' object is not callable\", "
"'occurred at index {0}'".format(self.index[0]))
if len(self.columns) != len(set(self.columns)):
warnings.warn(
'duplicate column names not supported with apply().',
FutureWarning, stacklevel=2)
has_list = list in map(type, func.values())
part_ind_tuples = [(self._col_metadata[key], key) for key in func]
if has_list:
# if input dict has a list, the function to apply must wrap
# single functions in lists as well to get the desired output
# format
result = [_deploy_func.remote(
lambda df: df.iloc[:, ind].apply(
func[key] if is_list_like(func[key])
else [func[key]]),
self._col_partitions[part])
for (part, ind), key in part_ind_tuples]
return pd.concat(ray.get(result), axis=1)
else:
result = [_deploy_func.remote(
lambda df: df.iloc[:, ind].apply(func[key]),
self._col_partitions[part])
for (part, ind), key in part_ind_tuples]
return pd.Series(ray.get(result), index=func.keys())
elif is_list_like(func):
if axis == 1:
raise TypeError(
"(\"'list' object is not callable\", "
"'occurred at index {0}'".format(self.index[0]))
# TODO: some checking on functions that return Series or Dataframe
new_cols = _map_partitions(lambda df: df.apply(func),
self._col_partitions)
# resolve function names for the DataFrame index
new_index = [f_name if isinstance(f_name, compat.string_types)
else f_name.__name__ for f_name in func]
return DataFrame(col_partitions=new_cols,
columns=self.columns,
index=new_index,
col_metadata=self._col_metadata)
elif callable(func):
return self._callable_function(func, axis=axis, *args, **kwds)
else:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def as_blocks(self, copy=True):
raise NotImplementedError(
+35 -47
View File
@@ -302,25 +302,22 @@ def test_int_dataframe():
test_apply(ray_df, pandas_df, func, 1)
test_aggregate(ray_df, pandas_df, func, 1)
else:
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
@@ -464,25 +461,22 @@ def test_float_dataframe():
test_apply(ray_df, pandas_df, func, 1)
test_aggregate(ray_df, pandas_df, func, 1)
else:
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
@@ -632,17 +626,14 @@ def test_mixed_dtype_dataframe():
test_agg(ray_df, pandas_df, func, 0)
func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
@@ -782,25 +773,22 @@ def test_nan_dataframe():
test_apply(ray_df, pandas_df, func, 1)
test_aggregate(ray_df, pandas_df, func, 1)
else:
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
func = ['sum', lambda df: df.sum()]
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_aggregate(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(NotImplementedError):
test_apply(ray_df, pandas_df, func, 0)
test_aggregate(ray_df, pandas_df, func, 0)
test_agg(ray_df, pandas_df, func, 0)
with pytest.raises(TypeError):
test_apply(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_aggregate(ray_df, pandas_df, func, 1)
with pytest.raises(NotImplementedError):
with pytest.raises(TypeError):
test_agg(ray_df, pandas_df, func, 1)
test_transform(ray_df, pandas_df)
@@ -855,8 +843,8 @@ def test_comparison_inter_ops(op):
ray_df2 = rdf.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
pandas_df2 = pd.DataFrame({"A": [0, 2], "col1": [0, 19], "col2": [1, 1]})
ray_df_equals_pandas(getattr(ray_df, op)(ray_df2),
getattr(pandas_df, op)(pandas_df2))
ray_df_equals_pandas(getattr(ray_df2, op)(ray_df2),
getattr(pandas_df2, op)(pandas_df2))
@pytest.fixture