[DataFrame] Implement .fillna(), .ffill(), .bfill(), .eval(), and .drop() (#1544)

* Implement ray.DataFrame.drop w/ tests

* Implement ray.DataFrame.eval w/ tests

Fix flake8 issues

* Fix flake8 issues in dataframe.py

* Implement fillna

* Implement fillna

* Implement ffill and bfill

* Define helper functions outside of method invocation

* Implement ray.DataFrame.eval w/ tests

* Index update

* Fixed transpose bug with nan values

* Fix lint

* Implement fillna

* Use ray index to check if labels exist in df

* Fix ValueError catching

* Remove duplicate test methods

* Add documentation for .fillna(), .ffill(), .bfill(), .eval(), and .drop()

Fix flake8 errors

* Remove notebook files

* Change fillna, eval, drop to use new index type

* Fix documentation for fillna, eval and drop

temp

Temp

temp

temp

temp

* Update drop to work with new type of ray index

* Fix flake8 errors

* Refactor fillna fix for index
This commit is contained in:
Peter Veerman
2018-03-09 07:37:27 -08:00
committed by Devin Petersohn
parent 7193107f32
commit 2b747ba46c
2 changed files with 840 additions and 42 deletions
+291 -23
View File
@@ -232,15 +232,16 @@ class DataFrame(object):
"""
assert(len(df) > 0)
if df:
if df is not None:
self._df = df
if columns:
if columns is not None:
self.columns = columns
if index:
self.index = index
self._lengths, self._index = _compute_length_and_index.remote(self._df)
if index is not None:
self.index = index
def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
@@ -458,8 +459,6 @@ class DataFrame(object):
DataFrame with the dropna applied.
"""
raise NotImplementedError("Not yet")
if how != 'any' and how != 'all':
raise ValueError("<how> not correctly set.")
def add(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
@@ -579,9 +578,16 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def bfill(self, axis=None, inplace=False, limit=None, downcast=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Synonym for DataFrame.fillna(method='bfill')
"""
new_df = self.fillna(
method='bfill', axis=axis, limit=limit, downcast=downcast
)
if inplace:
self._df = new_df._df
self.columns = new_df.columns
else:
return new_df
def bool(self):
"""Return the bool of a single element PandasObject.
@@ -731,9 +737,100 @@ class DataFrame(object):
def drop(self, labels=None, axis=0, index=None, columns=None, level=None,
inplace=False, errors='raise'):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Return new object with labels in requested axis removed.
Args:
labels: Index or column labels to drop.
axis: Whether to drop labels from the index (0 / index) or
columns (1 / columns).
index, columns: Alternative to specifying axis (labels, axis=1 is
equivalent to columns=labels).
level: For MultiIndex
inplace: If True, do operation inplace and return None.
errors: If ignore, suppress error and existing labels are
dropped.
Returns:
dropped : type of caller
"""
# inplace = validate_bool_kwarg(inplace, "inplace")
if labels is not None:
if index is not None or columns is not None:
raise ValueError("Cannot specify both 'labels' and "
"'index'/'columns'")
elif index is None and columns is None:
raise ValueError("Need to specify at least one of 'labels', "
"'index' or 'columns'")
new_df = self
is_axis_zero = axis is None or axis == 0 or axis == 'index'\
or axis == 'rows'
try:
if (is_axis_zero and columns is None) or index is not None:
values = labels if labels is not None else index
try:
try:
if len(values) == 0:
if inplace:
return
else:
return self
filtered_index = self._index.loc[list(values)]
except TypeError:
filtered_index = self._index.loc[[values]]
except KeyError:
raise ValueError(
"{} is not contained in the index".format(labels))
filtered_index.dropna(inplace=True)
partition_idx = [
filtered_index.loc[
filtered_index['partition'] == i
]['index_within_partition']
for i in range(len(self._df))
]
new_df = [
_deploy_func.remote(
lambda df, new_labels: df.drop(
new_labels, level=level, errors='ignore'),
self._df[i], partition_idx[i]
)
for i in range(len(self._df))
]
new_index = self._index.copy().drop(values, errors=errors)
new_df = DataFrame(new_df, self.columns, index=new_index.index)
except (ValueError, KeyError):
if errors == 'raise':
raise
new_df = self
try:
if not is_axis_zero or columns is not None:
values = labels if labels else columns
new_df = new_df._map_partitions(
lambda df: df.drop(
values, axis=1, level=level, errors='ignore')
)
new_columns = self.columns.to_series().drop(values,
errors=errors)
new_df.columns = pd.Index(new_columns)
except (ValueError, KeyError):
if errors == 'raise':
raise
new_df = self
if inplace:
self._update_inplace(
df=new_df._df,
index=new_df.index,
columns=new_df.columns
)
else:
return new_df
def drop_duplicates(self, subset=None, keep='first', inplace=False):
raise NotImplementedError(
@@ -784,9 +881,61 @@ class DataFrame(object):
return True
def eval(self, expr, inplace=False, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Evaluate a Python expression as a string using various backends.
Args:
expr: The expression to evaluate. This string cannot contain any
Python statements, only Python expressions.
parser: The parser to use to construct the syntax tree from the
expression. The default of 'pandas' parses code slightly
different than standard Python. Alternatively, you can parse
an expression using the 'python' parser to retain strict
Python semantics. See the enhancing performance documentation
for more details.
engine: The engine used to evaluate the expression.
truediv: Whether to use true division, like in Python >= 3
local_dict: A dictionary of local variables, taken from locals()
by default.
global_dict: A dictionary of global variables, taken from
globals() by default.
resolvers: A list of objects implementing the __getitem__ special
method that you can use to inject an additional collection
of namespaces to use for variable lookup. For example, this is
used in the query() method to inject the index and columns
variables that refer to their respective DataFrame instance
attributes.
level: The number of prior stack frames to traverse and add to
the current scope. Most users will not need to change this
parameter.
target: This is the target object for assignment. It is used when
there is variable assignment in the expression. If so, then
target must support item assignment with string keys, and if a
copy is being returned, it must also support .copy().
inplace: If target is provided, and the expression mutates target,
whether to modify target inplace. Otherwise, return a copy of
target with the mutation.
Returns:
ndarray, numeric scalar, DataFrame, Series
"""
inplace = validate_bool_kwarg(inplace, "inplace")
new_df = self._map_partitions(lambda df: df.eval(expr, inplace=False,
**kwargs))
new_df.columns = new_df.columns.insert(self.columns.size, 'e')
if inplace:
# TODO: return ray series instead of ray df
self.e = new_df.drop(columns=self.columns)
self._df = new_df._df
self.columns = new_df.columns
else:
return new_df
def ewm(self, com=None, span=None, halflife=None, alpha=None,
min_periods=0, freq=None, adjust=True, ignore_na=False, axis=0):
@@ -800,15 +949,136 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def ffill(self, axis=None, inplace=False, limit=None, downcast=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Synonym for DataFrame.fillna(method='ffill')
"""
new_df = self.fillna(
method='ffill', axis=axis, limit=limit, downcast=downcast
)
if inplace:
self._df = new_df._df
self.columns = new_df.columns
else:
return new_df
def fillna(self, value=None, method=None, axis=None, inplace=False,
limit=None, downcast=None, **kwargs):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Fill NA/NaN values using the specified method.
Args:
value: Value to use to fill holes. This value cannot be a list.
method: Method to use for filling holes in reindexed Series pad.
ffill: propagate last valid observation forward to next valid
backfill.
bfill: use NEXT valid observation to fill gap.
axis: 0 or index, 1 or columns.
inplace: If True, fill in place. Note: this will modify any other
views on this object.
limit: If method is specified, this is the maximum number of
consecutive NaN values to forward/backward fill. In other
words, if there is a gap with more than this number of
consecutive NaNs, it will only be partially filled. If method
is not specified, this is the maximum number of entries along
the entire axis where NaNs will be filled. Must be greater
than 0 if not None.
downcast: A dict of item->dtype of what to downcast if possible,
or the string infer which will try to downcast to an
appropriate equal type.
Returns:
filled: DataFrame
"""
if isinstance(value, (list, tuple)):
raise TypeError('"value" parameter must be a scalar or dict, but '
'you passed a "{0}"'.format(type(value).__name__))
if value is None and method is None:
raise ValueError('must specify a fill method or value')
if value is not None and method is not None:
raise ValueError('cannot specify both a fill method and value')
if method is not None and method not in ['backfill', 'bfill', 'pad',
'ffill']:
expecting = 'pad (ffill) or backfill (bfill)'
msg = 'Invalid fill method. Expecting {expecting}. Got {method}'\
.format(expecting=expecting, method=method)
raise ValueError(msg)
partition_idx = [
self._index.loc[
self._index['partition'] == i
].index
for i in range(len(self._df))
]
def fillna_part(df, real_index):
old_index = df.index
df.index = real_index
new_df = df.fillna(value=value, method=method, axis=axis,
limit=limit, downcast=downcast, **kwargs)
new_df.index = old_index
return new_df
new_df = [
_deploy_func.remote(
fillna_part,
part, partition_idx[i]
)
for i, part in enumerate(self._df)
]
new_df = DataFrame(new_df, self.columns, self.index)
is_bfill = method is not None and method in ['backfill', 'bfill']
is_ffill = method is not None and method in ['pad', 'ffill']
is_axis_zero = axis is None or axis == 0 or axis == 'index'\
or axis == 'rows'
if is_axis_zero and (is_bfill or is_ffill):
def fill_in_part(part, row):
return part.fillna(value=row, axis=axis, limit=limit,
downcast=downcast, **kwargs)
last_row_df = None
if is_ffill:
last_row_df = pd.DataFrame(
[df.iloc[-1, :] for df in ray.get(new_df._df[:-1])]
)
else:
last_row_df = pd.DataFrame(
[df.iloc[0, :] for df in ray.get(new_df._df[1:])]
)
last_row_df.fillna(value=value, method=method, axis=axis,
inplace=True, limit=limit,
downcast=downcast, **kwargs)
if is_ffill:
new_df._df[1:] = [
_deploy_func.remote(fill_in_part, new_df._df[i + 1],
last_row_df.iloc[i, :])
for i in range(len(self._df) - 1)
]
else:
new_df._df[:-1] = [
_deploy_func.remote(fill_in_part, new_df._df[i],
last_row_df.iloc[i])
for i in range(len(self._df) - 1)
]
# TODO: Revist this to improve performance
if limit is not None:
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if inplace:
self._update_inplace(
df=new_df._df,
columns=new_df.columns,
index=new_df.index
)
else:
return new_df
def filter(self, items=None, like=None, regex=None, axis=None):
raise NotImplementedError(
@@ -2533,8 +2803,6 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True):
Returns:
A new Ray DataFrame object.
"""
if sort and not df.index.is_monotonic_increasing:
df = df.sort_index(ascending=True)
if npartitions is not None:
chunksize = int(len(df) / npartitions)
+549 -19
View File
@@ -5,14 +5,24 @@ from __future__ import print_function
import pytest
import numpy as np
import pandas as pd
import pandas.util.testing as tm
import ray.dataframe as rdf
from pandas.tests.frame.common import TestData
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
@pytest.fixture
def ray_df_equals(ray_df1, ray_df2):
return rdf.to_pandas(ray_df1).sort_index().equals(
rdf.to_pandas(ray_df2).sort_index()
)
@pytest.fixture
def test_roundtrip(ray_df, pandas_df):
assert(ray_df_equals_pandas(ray_df, pandas_df))
@@ -696,11 +706,16 @@ def test_between_time():
ray_df.between_time(None, None)
def test_bfill():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.between_time(None, None)
@pytest.fixture
def test_bfill(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.bfill(),
test_data.tsframe.bfill()
)
@pytest.fixture
@@ -870,9 +885,92 @@ def test_dot():
def test_drop():
ray_df = create_test_dataframe()
simple = pd.DataFrame({"A": [1, 2, 3, 4], "B": [0, 1, 2, 3]})
ray_simple = rdf.from_pandas(simple, 2)
assert ray_df_equals_pandas(ray_simple.drop("A", axis=1), simple[['B']])
assert ray_df_equals_pandas(ray_simple.drop(["A", "B"], axis='columns'),
simple[[]])
assert ray_df_equals_pandas(ray_simple.drop([0, 1, 3], axis=0),
simple.loc[[2], :])
assert ray_df_equals_pandas(ray_simple.drop([0, 3], axis='index'),
simple.loc[[1, 2], :])
with pytest.raises(NotImplementedError):
ray_df.drop()
pytest.raises(ValueError, ray_simple.drop, 5)
pytest.raises(ValueError, ray_simple.drop, 'C', 1)
pytest.raises(ValueError, ray_simple.drop, [1, 5])
pytest.raises(ValueError, ray_simple.drop, ['A', 'C'], 1)
# errors = 'ignore'
assert ray_df_equals_pandas(ray_simple.drop(5, errors='ignore'), simple)
assert ray_df_equals_pandas(ray_simple.drop([0, 5], errors='ignore'),
simple.loc[[1, 2, 3], :])
assert ray_df_equals_pandas(ray_simple.drop('C', axis=1, errors='ignore'),
simple)
assert ray_df_equals_pandas(ray_simple.drop(['A', 'C'], axis=1,
errors='ignore'),
simple[['B']])
# non-unique - wheee!
nu_df = pd.DataFrame(pd.compat.lzip(range(3), range(-3, 1), list('abc')),
columns=['a', 'a', 'b'])
ray_nu_df = rdf.from_pandas(nu_df, 3)
assert ray_df_equals_pandas(ray_nu_df.drop('a', axis=1), nu_df[['b']])
assert ray_df_equals_pandas(ray_nu_df.drop('b', axis='columns'),
nu_df['a'])
assert ray_df_equals_pandas(ray_nu_df.drop([]), nu_df) # GH 16398
nu_df = nu_df.set_index(pd.Index(['X', 'Y', 'X']))
nu_df.columns = list('abc')
ray_nu_df = rdf.from_pandas(nu_df, 3)
assert ray_df_equals_pandas(ray_nu_df.drop('X', axis='rows'),
nu_df.loc[["Y"], :])
assert ray_df_equals_pandas(ray_nu_df.drop(['X', 'Y'], axis=0),
nu_df.loc[[], :])
# inplace cache issue
# GH 5628
df = pd.DataFrame(np.random.randn(10, 3), columns=list('abc'))
ray_df = rdf.from_pandas(df, 2)
expected = df[~(df.b > 0)]
ray_df.drop(labels=df[df.b > 0].index, inplace=True)
assert ray_df_equals_pandas(ray_df, expected)
def test_drop_api_equivalence():
# equivalence of the labels/axis and index/columns API's (GH12392)
df = pd.DataFrame([[1, 2, 3], [3, 4, 5], [5, 6, 7]],
index=['a', 'b', 'c'],
columns=['d', 'e', 'f'])
ray_df = rdf.from_pandas(df, 3)
res1 = ray_df.drop('a')
res2 = ray_df.drop(index='a')
assert ray_df_equals(res1, res2)
res1 = ray_df.drop('d', 1)
res2 = ray_df.drop(columns='d')
assert ray_df_equals(res1, res2)
res1 = ray_df.drop(labels='e', axis=1)
res2 = ray_df.drop(columns='e')
assert ray_df_equals(res1, res2)
res1 = ray_df.drop(['a'], axis=0)
res2 = ray_df.drop(index=['a'])
assert ray_df_equals(res1, res2)
res1 = ray_df.drop(['a'], axis=0).drop(['d'], axis=1)
res2 = ray_df.drop(index=['a'], columns=['d'])
assert ray_df_equals(res1, res2)
with pytest.raises(ValueError):
ray_df.drop(labels='a', index='b')
with pytest.raises(ValueError):
ray_df.drop(labels='a', columns='b')
with pytest.raises(ValueError):
ray_df.drop(axis=1)
def test_drop_duplicates():
@@ -912,11 +1010,36 @@ def test_equals():
assert not ray_df3.equals(ray_df2)
def test_eval():
ray_df = create_test_dataframe()
def test_eval_df_use_case():
df = pd.DataFrame({'a': np.random.randn(10),
'b': np.random.randn(10)})
ray_df = rdf.from_pandas(df, 5)
df.eval("e = arctan2(sin(a), b)",
engine='python',
parser='pandas', inplace=True)
expect = df.e
ray_df.eval("e = arctan2(sin(a), b)",
engine='python',
parser='pandas', inplace=True)
got = ray_df.e
# TODO: Use a series equality validator.
assert ray_df_equals_pandas(got, pd.DataFrame(expect, columns=['e']))
with pytest.raises(NotImplementedError):
ray_df.eval(None)
def test_eval_df_arithmetic_subexpression():
df = pd.DataFrame({'a': np.random.randn(10),
'b': np.random.randn(10)})
ray_df = rdf.from_pandas(df, 5)
df.eval("e = sin(a + b)",
engine='python',
parser='pandas', inplace=True)
expect = df.e
ray_df.eval("e = sin(a + b)",
engine='python',
parser='pandas', inplace=True)
got = ray_df.e
# TODO: Use a series equality validator.
assert ray_df_equals_pandas(got, pd.DataFrame(expect, columns=['e']))
def test_ewm():
@@ -933,18 +1056,425 @@ def test_expanding():
ray_df.expanding()
def test_ffill():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.ffill()
@pytest.fixture
def test_ffill(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.ffill(),
test_data.tsframe.ffill()
)
def test_fillna():
ray_df = create_test_dataframe()
test_fillna_sanity()
test_fillna_downcast()
test_ffill()
test_ffill2()
test_bfill()
test_bfill2()
test_fillna_inplace()
# test_frame_fillna_limit()
# test_frame_pad_backfill_limit()
test_fillna_dtype_conversion()
test_fillna_skip_certain_blocks()
test_fillna_dict_series()
test_fillna_dataframe()
test_fillna_columns()
test_fillna_invalid_method()
test_fillna_invalid_value()
test_fillna_col_reordering()
with pytest.raises(NotImplementedError):
ray_df.fillna()
@pytest.fixture
def test_fillna_sanity(num_partitions=2):
test_data = TestData()
tf = test_data.tsframe
tf.loc[tf.index[:5], 'A'] = np.nan
tf.loc[tf.index[-5:], 'A'] = np.nan
zero_filled = test_data.tsframe.fillna(0)
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions).fillna(0)
assert ray_df_equals_pandas(ray_df, zero_filled)
padded = test_data.tsframe.fillna(method='pad')
ray_df = rdf.from_pandas(test_data.tsframe,
num_partitions).fillna(method='pad')
assert ray_df_equals_pandas(ray_df, padded)
# mixed type
mf = test_data.mixed_frame
mf.loc[mf.index[5:20], 'foo'] = np.nan
mf.loc[mf.index[-10:], 'A'] = np.nan
result = test_data.mixed_frame.fillna(value=0)
ray_df = rdf.from_pandas(test_data.mixed_frame,
num_partitions).fillna(value=0)
assert ray_df_equals_pandas(ray_df, result)
result = test_data.mixed_frame.fillna(method='pad')
ray_df = rdf.from_pandas(test_data.mixed_frame,
num_partitions).fillna(method='pad')
assert ray_df_equals_pandas(ray_df, result)
pytest.raises(ValueError, test_data.tsframe.fillna)
pytest.raises(ValueError, rdf.from_pandas(test_data.tsframe,
num_partitions).fillna)
with pytest.raises(ValueError):
rdf.from_pandas(test_data.tsframe, num_partitions).fillna(
5, method='ffill'
)
# mixed numeric (but no float16)
mf = test_data.mixed_float.reindex(columns=['A', 'B', 'D'])
mf.loc[mf.index[-10:], 'A'] = np.nan
result = mf.fillna(value=0)
ray_df = rdf.from_pandas(mf, num_partitions).fillna(value=0)
assert ray_df_equals_pandas(ray_df, result)
result = mf.fillna(method='pad')
ray_df = rdf.from_pandas(mf, num_partitions).fillna(method='pad')
assert ray_df_equals_pandas(ray_df, result)
# TODO: Use this when Arrow issue resolves:
# (https://issues.apache.org/jira/browse/ARROW-2122)
# empty frame (GH #2778)
# df = DataFrame(columns=['x'])
# for m in ['pad', 'backfill']:
# df.x.fillna(method=m, inplace=True)
# df.x.fillna(method=m)
# with different dtype (GH3386)
df = pd.DataFrame([['a', 'a', np.nan, 'a'], [
'b', 'b', np.nan, 'b'], ['c', 'c', np.nan, 'c']])
result = df.fillna({2: 'foo'})
ray_df = rdf.from_pandas(df, num_partitions).fillna({2: 'foo'})
assert ray_df_equals_pandas(ray_df, result)
ray_df = rdf.from_pandas(df, num_partitions)
df.fillna({2: 'foo'}, inplace=True)
ray_df.fillna({2: 'foo'}, inplace=True)
assert ray_df_equals_pandas(ray_df, result)
# limit and value
df = pd.DataFrame(np.random.randn(10, 3))
df.iloc[2:7, 0] = np.nan
df.iloc[3:5, 2] = np.nan
# result = df.fillna(999, limit=1)
# ray_df = rdf.from_pandas(df, num_partitions).fillna(999, limit=1)
# assert ray_df_equals_pandas(ray_df, result)
# with datelike
# GH 6344
df = pd.DataFrame({
'Date': [pd.NaT, pd.Timestamp("2014-1-1")],
'Date2': [pd.Timestamp("2013-1-1"), pd.NaT]
})
result = df.fillna(value={'Date': df['Date2']})
ray_df = rdf.from_pandas(df, num_partitions).fillna(
value={'Date': df['Date2']}
)
assert ray_df_equals_pandas(ray_df, result)
# TODO: Use this when Arrow issue resolves:
# (https://issues.apache.org/jira/browse/ARROW-2122)
# with timezone
# GH 15855
"""
df = pd.DataFrame({'A': [pd.Timestamp('2012-11-11 00:00:00+01:00'),
pd.NaT]})
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(ray_df.fillna(method='pad'),
df.fillna(method='pad'))
df = pd.DataFrame({'A': [pd.NaT,
pd.Timestamp('2012-11-11 00:00:00+01:00')]})
ray_df = rdf.from_pandas(df, num_partitions).fillna(method='bfill')
assert ray_df_equals_pandas(ray_df, df.fillna(method='bfill'))
"""
@pytest.fixture
def test_fillna_downcast(num_partitions=2):
# GH 15277
# infer int64 from float64
df = pd.DataFrame({'a': [1., np.nan]})
result = df.fillna(0, downcast='infer')
ray_df = rdf.from_pandas(df, num_partitions).fillna(0, downcast='infer')
assert ray_df_equals_pandas(ray_df, result)
# infer int64 from float64 when fillna value is a dict
df = pd.DataFrame({'a': [1., np.nan]})
result = df.fillna({'a': 0}, downcast='infer')
ray_df = rdf.from_pandas(df, num_partitions).fillna(
{'a': 0}, downcast='infer'
)
assert ray_df_equals_pandas(ray_df, result)
@pytest.fixture
def test_ffill2(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill'),
test_data.tsframe.fillna(method='ffill')
)
@pytest.fixture
def test_bfill2(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='bfill'),
test_data.tsframe.fillna(method='bfill')
)
@pytest.fixture
def test_fillna_inplace(num_partitions=2):
df = pd.DataFrame(np.random.randn(10, 4))
df[1][:4] = np.nan
df[3][-4:] = np.nan
ray_df = rdf.from_pandas(df, num_partitions)
df.fillna(value=0, inplace=True)
assert not ray_df_equals_pandas(ray_df, df)
ray_df.fillna(value=0, inplace=True)
assert ray_df_equals_pandas(ray_df, df)
ray_df = rdf.from_pandas(df, num_partitions).fillna(value={0: 0},
inplace=True)
assert ray_df is None
df[1][:4] = np.nan
df[3][-4:] = np.nan
ray_df = rdf.from_pandas(df, num_partitions)
df.fillna(method='ffill', inplace=True)
assert not ray_df_equals_pandas(ray_df, df)
ray_df.fillna(method='ffill', inplace=True)
assert ray_df_equals_pandas(ray_df, df)
@pytest.fixture
def test_frame_fillna_limit(num_partitions=2):
index = np.arange(10)
df = pd.DataFrame(np.random.randn(10, 4), index=index)
expected = df[:2].reindex(index)
expected = expected.fillna(method='pad', limit=5)
ray_df = rdf.from_pandas(df[:2].reindex(index), num_partitions).fillna(
method='pad', limit=5
)
assert ray_df_equals_pandas(ray_df, expected)
expected = df[-2:].reindex(index)
expected = expected.fillna(method='backfill', limit=5)
ray_df = rdf.from_pandas(df[-2:].reindex(index), num_partitions).fillna(
method='backfill', limit=5
)
assert ray_df_equals_pandas(ray_df, expected)
@pytest.fixture
def test_frame_pad_backfill_limit(num_partitions=2):
index = np.arange(10)
df = pd.DataFrame(np.random.randn(10, 4), index=index)
result = df[:2].reindex(index)
ray_df = rdf.from_pandas(result, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='pad', limit=5),
result.fillna(method='pad', limit=5)
)
result = df[-2:].reindex(index)
ray_df = rdf.from_pandas(result, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='backfill', limit=5),
result.fillna(method='backfill', limit=5)
)
@pytest.fixture
def test_fillna_dtype_conversion(num_partitions=2):
# make sure that fillna on an empty frame works
df = pd.DataFrame(index=["A", "B", "C"], columns=[1, 2, 3, 4, 5])
# empty block
df = pd.DataFrame(index=range(3), columns=['A', 'B'], dtype='float64')
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna('nan'),
df.fillna('nan')
)
# equiv of replace
df = pd.DataFrame(dict(A=[1, np.nan], B=[1., 2.]))
ray_df = rdf.from_pandas(df, num_partitions)
for v in ['', 1, np.nan, 1.0]:
assert ray_df_equals_pandas(
ray_df.fillna(v),
df.fillna(v)
)
@pytest.fixture
def test_fillna_skip_certain_blocks(num_partitions=2):
# don't try to fill boolean, int blocks
df = pd.DataFrame(np.random.randn(10, 4).astype(int))
ray_df = rdf.from_pandas(df, num_partitions)
# it works!
assert ray_df_equals_pandas(
ray_df.fillna(np.nan),
df.fillna(np.nan)
)
@pytest.fixture
def test_fillna_dict_series(num_partitions=2):
df = pd.DataFrame({'a': [np.nan, 1, 2, np.nan, np.nan],
'b': [1, 2, 3, np.nan, np.nan],
'c': [np.nan, 1, 2, 3, 4]})
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna({'a': 0, 'b': 5}),
df.fillna({'a': 0, 'b': 5})
)
# it works
assert ray_df_equals_pandas(
ray_df.fillna({'a': 0, 'b': 5, 'd': 7}),
df.fillna({'a': 0, 'b': 5, 'd': 7})
)
# Series treated same as dict
assert ray_df_equals_pandas(
ray_df.fillna(df.max()),
df.fillna(df.max())
)
@pytest.fixture
def test_fillna_dataframe(num_partitions=2):
# GH 8377
df = pd.DataFrame({'a': [np.nan, 1, 2, np.nan, np.nan],
'b': [1, 2, 3, np.nan, np.nan],
'c': [np.nan, 1, 2, 3, 4]},
index=list('VWXYZ'))
ray_df = rdf.from_pandas(df, num_partitions)
# df2 may have different index and columns
df2 = pd.DataFrame({'a': [np.nan, 10, 20, 30, 40],
'b': [50, 60, 70, 80, 90],
'foo': ['bar'] * 5},
index=list('VWXuZ'))
# only those columns and indices which are shared get filled
assert ray_df_equals_pandas(
ray_df.fillna(df2),
df.fillna(df2)
)
@pytest.fixture
def test_fillna_columns(num_partitions=2):
df = pd.DataFrame(np.random.randn(10, 10))
df.values[:, ::2] = np.nan
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill', axis=1),
df.fillna(method='ffill', axis=1)
)
df.insert(6, 'foo', 5)
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill', axis=1),
df.fillna(method='ffill', axis=1)
)
@pytest.fixture
def test_fillna_invalid_method(num_partitions=2):
test_data = TestData()
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
with tm.assert_raises_regex(ValueError, 'ffil'):
ray_df.fillna(method='ffil')
@pytest.fixture
def test_fillna_invalid_value(num_partitions=2):
test_data = TestData()
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
# list
pytest.raises(TypeError, ray_df.fillna, [1, 2])
# tuple
pytest.raises(TypeError, ray_df.fillna, (1, 2))
# TODO: Uncomment when iloc is implemented
# frame with series
# pytest.raises(ValueError, ray_df.iloc[:, 0].fillna, ray_df)
@pytest.fixture
def test_fillna_col_reordering(num_partitions=2):
cols = ["COL." + str(i) for i in range(5, 0, -1)]
data = np.random.rand(20, 5)
df = pd.DataFrame(index=range(20), columns=cols, data=data)
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill'),
df.fillna(method='ffill')
)
"""
TODO: Use this when Arrow issue resolves:
(https://issues.apache.org/jira/browse/ARROW-2122)
@pytest.fixture
def test_fillna_datetime_columns(num_partitions=2):
# GH 7095
df = pd.DataFrame({'A': [-1, -2, np.nan],
'B': date_range('20130101', periods=3),
'C': ['foo', 'bar', None],
'D': ['foo2', 'bar2', None]},
index=date_range('20130110', periods=3))
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna('?'),
df.fillna('?')
)
df = pd.DataFrame({'A': [-1, -2, np.nan],
'B': [pd.Timestamp('2013-01-01'),
pd.Timestamp('2013-01-02'), pd.NaT],
'C': ['foo', 'bar', None],
'D': ['foo2', 'bar2', None]},
index=date_range('20130110', periods=3))
ray_df = rdf.from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna('?'),
df.fillna('?')
)
"""
def test_filter():