[DataFrame] Implements filter and dropna (#1959)

* implement filter

* begin implementation of dropna

* implement dropna

* docs and tests

* resolving comments

* resolving merge

* add error checking to dropna

* fix update inplace call

* Implement multiple axis for dropna (#13)

* Implement multiple axis for dropna

* Add multiple axis dropna test

* Fix using dummy_frame in dropna

* Clean up dropna multiple axis tests

* remove unnecessary axis modification

* Clean up dropna tests

* resolve comments

* fix lint
This commit is contained in:
Kunal Gosar
2018-05-04 12:21:16 -07:00
committed by Devin Petersohn
parent 22d4950fae
commit 4030356b51
3 changed files with 248 additions and 11 deletions
+133 -7
View File
@@ -9,7 +9,7 @@ from pandas.core.index import _ensure_index_from_sequences
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas import compat
from pandas.compat import lzip, string_types, cPickle as pkl
from pandas.compat import lzip, to_str, string_types, cPickle as pkl
import pandas.core.common as com
from pandas.core.dtypes.common import (
is_bool_dtype,
@@ -756,7 +756,8 @@ class DataFrame(object):
T = property(transpose)
def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
def dropna(self, axis=0, how='any', thresh=None, subset=None,
inplace=False):
"""Create a new DataFrame from the removed NA values from this one.
Args:
@@ -774,7 +775,94 @@ class DataFrame(object):
If inplace is set to True, returns None, otherwise returns a new
DataFrame with the dropna applied.
"""
raise NotImplementedError("Not yet")
inplace = validate_bool_kwarg(inplace, "inplace")
if is_list_like(axis):
axis = [pd.DataFrame()._get_axis_number(ax) for ax in axis]
result = self
# TODO(kunalgosar): this builds an intermediate dataframe,
# which does unnecessary computation
for ax in axis:
result = result.dropna(
axis=ax, how=how, thresh=thresh, subset=subset)
if not inplace:
return result
self._update_inplace(block_partitions=result._block_partitions,
columns=result.columns,
index=result.index)
return None
axis = pd.DataFrame()._get_axis_number(axis)
if how is not None and how not in ['any', 'all']:
raise ValueError('invalid how option: %s' % how)
if how is None and thresh is None:
raise TypeError('must specify how or thresh')
if subset is not None:
subset = set(subset)
if axis == 1:
subset = [item for item in self.index if item in subset]
else:
subset = [item for item in self.columns if item in subset]
def dropna_helper(df):
new_df = df.dropna(axis=axis, how=how, thresh=thresh,
subset=subset, inplace=False)
if axis == 1:
new_index = new_df.columns
new_df.columns = pd.RangeIndex(0, len(new_df.columns))
else:
new_index = new_df.index
new_df.reset_index(drop=True, inplace=True)
return new_df, new_index
parts = self._col_partitions if axis == 1 else self._row_partitions
result = [_deploy_func._submit(args=(dropna_helper, df),
num_return_vals=2) for df in parts]
new_parts, new_vals = [list(t) for t in zip(*result)]
if axis == 1:
new_vals = [self._col_metadata.get_global_indices(i, vals)
for i, vals in enumerate(ray.get(new_vals))]
# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_cols = self.columns[new_vals]
if not inplace:
return DataFrame(col_partitions=new_parts,
columns=new_cols,
index=self.index)
self._update_inplace(col_partitions=new_parts,
columns=new_cols,
index=self.index)
else:
new_vals = [self._row_metadata.get_global_indices(i, vals)
for i, vals in enumerate(ray.get(new_vals))]
# This flattens the 2d array to 1d
new_vals = [i for j in new_vals for i in j]
new_rows = self.index[new_vals]
if not inplace:
return DataFrame(row_partitions=new_parts,
index=new_rows,
columns=self.columns)
self._update_inplace(row_partitions=new_parts,
index=new_rows,
columns=self.columns)
return None
def add(self, other, axis='columns', level=None, fill_value=None):
"""Add this DataFrame to another or a scalar/list.
@@ -1797,9 +1885,45 @@ class DataFrame(object):
return new_obj
def filter(self, items=None, like=None, regex=None, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Subset rows or columns based on their labels
Args:
items (list): list of labels to subset
like (string): retain labels where `arg in label == True`
regex (string): retain labels matching regex input
axis: axis to filter on
Returns:
A new dataframe with the filter applied.
"""
nkw = com._count_not_none(items, like, regex)
if nkw > 1:
raise TypeError('Keyword arguments `items`, `like`, or `regex` '
'are mutually exclusive')
if nkw == 0:
raise TypeError('Must pass either `items`, `like`, or `regex`')
if axis is None:
axis = 'columns' # This is the default info axis for dataframes
axis = pd.DataFrame()._get_axis_number(axis)
labels = self.columns if axis else self.index
if items is not None:
bool_arr = labels.isin(items)
elif like is not None:
def f(x):
return like in to_str(x)
bool_arr = labels.map(f).tolist()
else:
def f(x):
return matcher.search(to_str(x)) is not None
matcher = re.compile(regex)
bool_arr = labels.map(f).tolist()
if not axis:
return self[bool_arr]
return self[self.columns[bool_arr]]
def first(self, offset):
raise NotImplementedError(
@@ -3990,7 +4114,9 @@ class DataFrame(object):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = [col for col in self.col if col in set(columns)]
indices_for_rows = \
[i for i, item in enumerate(self.columns)
if item in set(columns)]
new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
+7
View File
@@ -271,6 +271,13 @@ class _IndexMetadata(object):
# Return inserted coordinate for callee
return coord_to_insert
def get_global_indices(self, partition, index_within_partition_list):
total = 0
for i in range(partition):
total += self._lengths[i]
return [total + i for i in index_within_partition_list]
def squeeze(self, partition, index_within_partition):
"""Prepare a single coordinate for removal by "squeezing" the
subsequent coordinates "up" one index within that partition. To be used
+108 -4
View File
@@ -190,6 +190,11 @@ def test_int_dataframe():
'col3',
'col4']
filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}
test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
@@ -348,6 +353,11 @@ def test_float_dataframe():
'col3',
'col4']
filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}
test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
@@ -506,6 +516,11 @@ def test_mixed_dtype_dataframe():
'col3',
'col4']
filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}
test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
@@ -664,6 +679,11 @@ def test_nan_dataframe():
'col3',
'col4']
filter_by = {'items': ['col1', 'col5'],
'regex': '4$|3$',
'like': 'col'}
test_filter(ray_df, pandas_df, filter_by)
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
@@ -798,6 +818,23 @@ def test_nan_dataframe():
test_transform(ray_df, pandas_df)
def test_dense_nan_df():
ray_df = rdf.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))
pd_df = pd.DataFrame([[np.nan, 2, np.nan, 0],
[3, 4, np.nan, 1],
[np.nan, np.nan, np.nan, 5]],
columns=list('ABCD'))
test_dropna(ray_df, pd_df)
test_dropna_inplace(ray_df, pd_df)
test_dropna_multiple_axes(ray_df, pd_df)
test_dropna_multiple_axes_inplace(ray_df, pd_df)
@pytest.fixture
def test_inter_df_math(op, simple=False):
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
@@ -1252,6 +1289,68 @@ def test_drop_duplicates():
ray_df.drop_duplicates()
@pytest.fixture
def test_dropna(ray_df, pd_df):
assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='all'),
pd_df.dropna(axis=1, how='all'))
assert ray_df_equals_pandas(ray_df.dropna(axis=1, how='any'),
pd_df.dropna(axis=1, how='any'))
assert ray_df_equals_pandas(ray_df.dropna(axis=0, how='all'),
pd_df.dropna(axis=0, how='all'))
assert ray_df_equals_pandas(ray_df.dropna(thresh=2),
pd_df.dropna(thresh=2))
@pytest.fixture
def test_dropna_inplace(ray_df, pd_df):
ray_df = ray_df.copy()
pd_df = pd_df.copy()
ray_df.dropna(thresh=2, inplace=True)
pd_df.dropna(thresh=2, inplace=True)
assert ray_df_equals_pandas(ray_df, pd_df)
ray_df.dropna(axis=1, how='any', inplace=True)
pd_df.dropna(axis=1, how='any', inplace=True)
assert ray_df_equals_pandas(ray_df, pd_df)
@pytest.fixture
def test_dropna_multiple_axes(ray_df, pd_df):
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=[0, 1]),
pd_df.dropna(how='all', axis=[0, 1])
)
assert ray_df_equals_pandas(
ray_df.dropna(how='all', axis=(0, 1)),
pd_df.dropna(how='all', axis=(0, 1))
)
@pytest.fixture
def test_dropna_multiple_axes_inplace(ray_df, pd_df):
ray_df_copy = ray_df.copy()
pd_df_copy = pd_df.copy()
ray_df_copy.dropna(how='all', axis=[0, 1], inplace=True)
pd_df_copy.dropna(how='all', axis=[0, 1], inplace=True)
assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)
ray_df_copy = ray_df.copy()
pd_df_copy = pd_df.copy()
ray_df_copy.dropna(how='all', axis=(0, 1), inplace=True)
pd_df_copy.dropna(how='all', axis=(0, 1), inplace=True)
assert ray_df_equals_pandas(ray_df_copy, pd_df_copy)
def test_duplicated():
ray_df = create_test_dataframe()
@@ -1747,11 +1846,16 @@ def test_fillna_datetime_columns(num_partitions=2):
"""
def test_filter():
ray_df = create_test_dataframe()
@pytest.fixture
def test_filter(ray_df, pandas_df, by):
ray_df_equals_pandas(ray_df.filter(items=by['items']),
pandas_df.filter(items=by['items']))
with pytest.raises(NotImplementedError):
ray_df.filter()
ray_df_equals_pandas(ray_df.filter(regex=by['regex']),
pandas_df.filter(regex=by['regex']))
ray_df_equals_pandas(ray_df.filter(like=by['like']),
pandas_df.filter(like=by['like']))
def test_first():