From 2b747ba46cee4445c8ff1640a2a5d7c88e2d1723 Mon Sep 17 00:00:00 2001 From: Peter Veerman Date: Fri, 9 Mar 2018 07:37:27 -0800 Subject: [PATCH] [DataFrame] Implement .fillna(), .ffill(), .bfill(), .eval(), and .drop() (#1544) * Implement ray.DataFrame.drop w/ tests * Implement ray.DataFrame.eval w/ tests Fix flake8 issues * Fix flake8 issues in dataframe.py * Implement fillna * Implement fillna * Implement ffill and bfill * Define helper functions outside of method invocation * Implement ray.DataFrame.eval w/ tests * Index update * Fixed transpose bug with nan values * Fix lint * Implement fillna * Use ray index to check if labels exist in df * Fix ValueError catching * Remove duplicate test methods * Add documentation for .fillna(), .ffill(), .bfill(), .eval(), and .drop() Fix flake8 errors * Remove notebook files * Change fillna, eval, drop to use new index type * Fix documentation for fillna, eval and drop temp Temp temp temp temp * Update drop to work with new type of ray index * Fix flake8 errors * Refactor fillna fix for index --- python/ray/dataframe/dataframe.py | 314 ++++++++++- python/ray/dataframe/test/test_dataframe.py | 568 +++++++++++++++++++- 2 files changed, 840 insertions(+), 42 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 6f1c6fee8..3702624ab 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -232,15 +232,16 @@ class DataFrame(object): """ assert(len(df) > 0) - if df: + if df is not None: self._df = df - if columns: + if columns is not None: self.columns = columns - if index: - self.index = index self._lengths, self._index = _compute_length_and_index.remote(self._df) + if index is not None: + self.index = index + def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -458,8 +459,6 @@ class DataFrame(object): DataFrame with the dropna applied. """ raise NotImplementedError("Not yet") - if how != 'any' and how != 'all': - raise ValueError(" not correctly set.") def add(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError( @@ -579,9 +578,16 @@ class DataFrame(object): "github.com/ray-project/ray.") def bfill(self, axis=None, inplace=False, limit=None, downcast=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Synonym for DataFrame.fillna(method='bfill') + """ + new_df = self.fillna( + method='bfill', axis=axis, limit=limit, downcast=downcast + ) + if inplace: + self._df = new_df._df + self.columns = new_df.columns + else: + return new_df def bool(self): """Return the bool of a single element PandasObject. @@ -731,9 +737,100 @@ class DataFrame(object): def drop(self, labels=None, axis=0, index=None, columns=None, level=None, inplace=False, errors='raise'): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Return new object with labels in requested axis removed. + Args: + labels: Index or column labels to drop. + + axis: Whether to drop labels from the index (0 / ‘index’) or + columns (1 / ‘columns’). + + index, columns: Alternative to specifying axis (labels, axis=1 is + equivalent to columns=labels). + + level: For MultiIndex + + inplace: If True, do operation inplace and return None. + + errors: If ‘ignore’, suppress error and existing labels are + dropped. + Returns: + dropped : type of caller + """ + # inplace = validate_bool_kwarg(inplace, "inplace") + if labels is not None: + if index is not None or columns is not None: + raise ValueError("Cannot specify both 'labels' and " + "'index'/'columns'") + elif index is None and columns is None: + raise ValueError("Need to specify at least one of 'labels', " + "'index' or 'columns'") + new_df = self + is_axis_zero = axis is None or axis == 0 or axis == 'index'\ + or axis == 'rows' + try: + if (is_axis_zero and columns is None) or index is not None: + values = labels if labels is not None else index + try: + try: + if len(values) == 0: + if inplace: + return + else: + return self + filtered_index = self._index.loc[list(values)] + except TypeError: + filtered_index = self._index.loc[[values]] + except KeyError: + raise ValueError( + "{} is not contained in the index".format(labels)) + + filtered_index.dropna(inplace=True) + + partition_idx = [ + filtered_index.loc[ + filtered_index['partition'] == i + ]['index_within_partition'] + for i in range(len(self._df)) + ] + + new_df = [ + _deploy_func.remote( + lambda df, new_labels: df.drop( + new_labels, level=level, errors='ignore'), + self._df[i], partition_idx[i] + ) + for i in range(len(self._df)) + ] + new_index = self._index.copy().drop(values, errors=errors) + new_df = DataFrame(new_df, self.columns, index=new_index.index) + except (ValueError, KeyError): + if errors == 'raise': + raise + new_df = self + + try: + if not is_axis_zero or columns is not None: + values = labels if labels else columns + new_df = new_df._map_partitions( + lambda df: df.drop( + values, axis=1, level=level, errors='ignore') + ) + new_columns = self.columns.to_series().drop(values, + errors=errors) + new_df.columns = pd.Index(new_columns) + except (ValueError, KeyError): + if errors == 'raise': + raise + new_df = self + + if inplace: + self._update_inplace( + df=new_df._df, + index=new_df.index, + columns=new_df.columns + ) + else: + return new_df def drop_duplicates(self, subset=None, keep='first', inplace=False): raise NotImplementedError( @@ -784,9 +881,61 @@ class DataFrame(object): return True def eval(self, expr, inplace=False, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Evaluate a Python expression as a string using various backends. + Args: + expr: The expression to evaluate. This string cannot contain any + Python statements, only Python expressions. + + parser: The parser to use to construct the syntax tree from the + expression. The default of 'pandas' parses code slightly + different than standard Python. Alternatively, you can parse + an expression using the 'python' parser to retain strict + Python semantics. See the enhancing performance documentation + for more details. + + engine: The engine used to evaluate the expression. + + truediv: Whether to use true division, like in Python >= 3 + + local_dict: A dictionary of local variables, taken from locals() + by default. + + global_dict: A dictionary of global variables, taken from + globals() by default. + + resolvers: A list of objects implementing the __getitem__ special + method that you can use to inject an additional collection + of namespaces to use for variable lookup. For example, this is + used in the query() method to inject the index and columns + variables that refer to their respective DataFrame instance + attributes. + + level: The number of prior stack frames to traverse and add to + the current scope. Most users will not need to change this + parameter. + + target: This is the target object for assignment. It is used when + there is variable assignment in the expression. If so, then + target must support item assignment with string keys, and if a + copy is being returned, it must also support .copy(). + + inplace: If target is provided, and the expression mutates target, + whether to modify target inplace. Otherwise, return a copy of + target with the mutation. + Returns: + ndarray, numeric scalar, DataFrame, Series + """ + inplace = validate_bool_kwarg(inplace, "inplace") + new_df = self._map_partitions(lambda df: df.eval(expr, inplace=False, + **kwargs)) + new_df.columns = new_df.columns.insert(self.columns.size, 'e') + if inplace: + # TODO: return ray series instead of ray df + self.e = new_df.drop(columns=self.columns) + self._df = new_df._df + self.columns = new_df.columns + else: + return new_df def ewm(self, com=None, span=None, halflife=None, alpha=None, min_periods=0, freq=None, adjust=True, ignore_na=False, axis=0): @@ -800,15 +949,136 @@ class DataFrame(object): "github.com/ray-project/ray.") def ffill(self, axis=None, inplace=False, limit=None, downcast=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Synonym for DataFrame.fillna(method='ffill') + """ + new_df = self.fillna( + method='ffill', axis=axis, limit=limit, downcast=downcast + ) + if inplace: + self._df = new_df._df + self.columns = new_df.columns + else: + return new_df def fillna(self, value=None, method=None, axis=None, inplace=False, limit=None, downcast=None, **kwargs): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Fill NA/NaN values using the specified method. + + Args: + value: Value to use to fill holes. This value cannot be a list. + + method: Method to use for filling holes in reindexed Series pad. + ffill: propagate last valid observation forward to next valid + backfill. + bfill: use NEXT valid observation to fill gap. + + axis: 0 or ‘index’, 1 or ‘columns’. + + inplace: If True, fill in place. Note: this will modify any other + views on this object. + + limit: If method is specified, this is the maximum number of + consecutive NaN values to forward/backward fill. In other + words, if there is a gap with more than this number of + consecutive NaNs, it will only be partially filled. If method + is not specified, this is the maximum number of entries along + the entire axis where NaNs will be filled. Must be greater + than 0 if not None. + + downcast: A dict of item->dtype of what to downcast if possible, + or the string ‘infer’ which will try to downcast to an + appropriate equal type. + + Returns: + filled: DataFrame + """ + if isinstance(value, (list, tuple)): + raise TypeError('"value" parameter must be a scalar or dict, but ' + 'you passed a "{0}"'.format(type(value).__name__)) + if value is None and method is None: + raise ValueError('must specify a fill method or value') + if value is not None and method is not None: + raise ValueError('cannot specify both a fill method and value') + if method is not None and method not in ['backfill', 'bfill', 'pad', + 'ffill']: + expecting = 'pad (ffill) or backfill (bfill)' + msg = 'Invalid fill method. Expecting {expecting}. Got {method}'\ + .format(expecting=expecting, method=method) + raise ValueError(msg) + + partition_idx = [ + self._index.loc[ + self._index['partition'] == i + ].index + for i in range(len(self._df)) + ] + + def fillna_part(df, real_index): + old_index = df.index + df.index = real_index + new_df = df.fillna(value=value, method=method, axis=axis, + limit=limit, downcast=downcast, **kwargs) + new_df.index = old_index + return new_df + + new_df = [ + _deploy_func.remote( + fillna_part, + part, partition_idx[i] + ) + for i, part in enumerate(self._df) + ] + + new_df = DataFrame(new_df, self.columns, self.index) + + is_bfill = method is not None and method in ['backfill', 'bfill'] + is_ffill = method is not None and method in ['pad', 'ffill'] + is_axis_zero = axis is None or axis == 0 or axis == 'index'\ + or axis == 'rows' + + if is_axis_zero and (is_bfill or is_ffill): + def fill_in_part(part, row): + return part.fillna(value=row, axis=axis, limit=limit, + downcast=downcast, **kwargs) + last_row_df = None + if is_ffill: + last_row_df = pd.DataFrame( + [df.iloc[-1, :] for df in ray.get(new_df._df[:-1])] + ) + else: + last_row_df = pd.DataFrame( + [df.iloc[0, :] for df in ray.get(new_df._df[1:])] + ) + last_row_df.fillna(value=value, method=method, axis=axis, + inplace=True, limit=limit, + downcast=downcast, **kwargs) + if is_ffill: + new_df._df[1:] = [ + _deploy_func.remote(fill_in_part, new_df._df[i + 1], + last_row_df.iloc[i, :]) + for i in range(len(self._df) - 1) + ] + else: + new_df._df[:-1] = [ + _deploy_func.remote(fill_in_part, new_df._df[i], + last_row_df.iloc[i]) + for i in range(len(self._df) - 1) + ] + + # TODO: Revist this to improve performance + if limit is not None: + raise NotImplementedError( + "To contribute to Pandas on Ray, please visit " + "github.com/ray-project/ray.") + + if inplace: + self._update_inplace( + df=new_df._df, + columns=new_df.columns, + index=new_df.index + ) + else: + return new_df def filter(self, items=None, like=None, regex=None, axis=None): raise NotImplementedError( @@ -2533,8 +2803,6 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True): Returns: A new Ray DataFrame object. """ - if sort and not df.index.is_monotonic_increasing: - df = df.sort_index(ascending=True) if npartitions is not None: chunksize = int(len(df) / npartitions) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 032411693..e38063db3 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -5,14 +5,24 @@ from __future__ import print_function import pytest import numpy as np import pandas as pd +import pandas.util.testing as tm import ray.dataframe as rdf +from pandas.tests.frame.common import TestData + @pytest.fixture def ray_df_equals_pandas(ray_df, pandas_df): return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) +@pytest.fixture +def ray_df_equals(ray_df1, ray_df2): + return rdf.to_pandas(ray_df1).sort_index().equals( + rdf.to_pandas(ray_df2).sort_index() + ) + + @pytest.fixture def test_roundtrip(ray_df, pandas_df): assert(ray_df_equals_pandas(ray_df, pandas_df)) @@ -696,11 +706,16 @@ def test_between_time(): ray_df.between_time(None, None) -def test_bfill(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.between_time(None, None) +@pytest.fixture +def test_bfill(num_partitions=2): + test_data = TestData() + test_data.tsframe['A'][:5] = np.nan + test_data.tsframe['A'][-5:] = np.nan + ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + assert ray_df_equals_pandas( + ray_df.bfill(), + test_data.tsframe.bfill() + ) @pytest.fixture @@ -870,9 +885,92 @@ def test_dot(): def test_drop(): ray_df = create_test_dataframe() + simple = pd.DataFrame({"A": [1, 2, 3, 4], "B": [0, 1, 2, 3]}) + ray_simple = rdf.from_pandas(simple, 2) + assert ray_df_equals_pandas(ray_simple.drop("A", axis=1), simple[['B']]) + assert ray_df_equals_pandas(ray_simple.drop(["A", "B"], axis='columns'), + simple[[]]) + assert ray_df_equals_pandas(ray_simple.drop([0, 1, 3], axis=0), + simple.loc[[2], :]) + assert ray_df_equals_pandas(ray_simple.drop([0, 3], axis='index'), + simple.loc[[1, 2], :]) - with pytest.raises(NotImplementedError): - ray_df.drop() + pytest.raises(ValueError, ray_simple.drop, 5) + pytest.raises(ValueError, ray_simple.drop, 'C', 1) + pytest.raises(ValueError, ray_simple.drop, [1, 5]) + pytest.raises(ValueError, ray_simple.drop, ['A', 'C'], 1) + + # errors = 'ignore' + assert ray_df_equals_pandas(ray_simple.drop(5, errors='ignore'), simple) + assert ray_df_equals_pandas(ray_simple.drop([0, 5], errors='ignore'), + simple.loc[[1, 2, 3], :]) + assert ray_df_equals_pandas(ray_simple.drop('C', axis=1, errors='ignore'), + simple) + assert ray_df_equals_pandas(ray_simple.drop(['A', 'C'], axis=1, + errors='ignore'), + simple[['B']]) + + # non-unique - wheee! + nu_df = pd.DataFrame(pd.compat.lzip(range(3), range(-3, 1), list('abc')), + columns=['a', 'a', 'b']) + ray_nu_df = rdf.from_pandas(nu_df, 3) + assert ray_df_equals_pandas(ray_nu_df.drop('a', axis=1), nu_df[['b']]) + assert ray_df_equals_pandas(ray_nu_df.drop('b', axis='columns'), + nu_df['a']) + assert ray_df_equals_pandas(ray_nu_df.drop([]), nu_df) # GH 16398 + + nu_df = nu_df.set_index(pd.Index(['X', 'Y', 'X'])) + nu_df.columns = list('abc') + ray_nu_df = rdf.from_pandas(nu_df, 3) + assert ray_df_equals_pandas(ray_nu_df.drop('X', axis='rows'), + nu_df.loc[["Y"], :]) + assert ray_df_equals_pandas(ray_nu_df.drop(['X', 'Y'], axis=0), + nu_df.loc[[], :]) + + # inplace cache issue + # GH 5628 + df = pd.DataFrame(np.random.randn(10, 3), columns=list('abc')) + ray_df = rdf.from_pandas(df, 2) + expected = df[~(df.b > 0)] + ray_df.drop(labels=df[df.b > 0].index, inplace=True) + assert ray_df_equals_pandas(ray_df, expected) + + +def test_drop_api_equivalence(): + # equivalence of the labels/axis and index/columns API's (GH12392) + df = pd.DataFrame([[1, 2, 3], [3, 4, 5], [5, 6, 7]], + index=['a', 'b', 'c'], + columns=['d', 'e', 'f']) + ray_df = rdf.from_pandas(df, 3) + + res1 = ray_df.drop('a') + res2 = ray_df.drop(index='a') + assert ray_df_equals(res1, res2) + + res1 = ray_df.drop('d', 1) + res2 = ray_df.drop(columns='d') + assert ray_df_equals(res1, res2) + + res1 = ray_df.drop(labels='e', axis=1) + res2 = ray_df.drop(columns='e') + assert ray_df_equals(res1, res2) + + res1 = ray_df.drop(['a'], axis=0) + res2 = ray_df.drop(index=['a']) + assert ray_df_equals(res1, res2) + + res1 = ray_df.drop(['a'], axis=0).drop(['d'], axis=1) + res2 = ray_df.drop(index=['a'], columns=['d']) + assert ray_df_equals(res1, res2) + + with pytest.raises(ValueError): + ray_df.drop(labels='a', index='b') + + with pytest.raises(ValueError): + ray_df.drop(labels='a', columns='b') + + with pytest.raises(ValueError): + ray_df.drop(axis=1) def test_drop_duplicates(): @@ -912,11 +1010,36 @@ def test_equals(): assert not ray_df3.equals(ray_df2) -def test_eval(): - ray_df = create_test_dataframe() +def test_eval_df_use_case(): + df = pd.DataFrame({'a': np.random.randn(10), + 'b': np.random.randn(10)}) + ray_df = rdf.from_pandas(df, 5) + df.eval("e = arctan2(sin(a), b)", + engine='python', + parser='pandas', inplace=True) + expect = df.e + ray_df.eval("e = arctan2(sin(a), b)", + engine='python', + parser='pandas', inplace=True) + got = ray_df.e + # TODO: Use a series equality validator. + assert ray_df_equals_pandas(got, pd.DataFrame(expect, columns=['e'])) - with pytest.raises(NotImplementedError): - ray_df.eval(None) + +def test_eval_df_arithmetic_subexpression(): + df = pd.DataFrame({'a': np.random.randn(10), + 'b': np.random.randn(10)}) + ray_df = rdf.from_pandas(df, 5) + df.eval("e = sin(a + b)", + engine='python', + parser='pandas', inplace=True) + expect = df.e + ray_df.eval("e = sin(a + b)", + engine='python', + parser='pandas', inplace=True) + got = ray_df.e + # TODO: Use a series equality validator. + assert ray_df_equals_pandas(got, pd.DataFrame(expect, columns=['e'])) def test_ewm(): @@ -933,18 +1056,425 @@ def test_expanding(): ray_df.expanding() -def test_ffill(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.ffill() +@pytest.fixture +def test_ffill(num_partitions=2): + test_data = TestData() + test_data.tsframe['A'][:5] = np.nan + test_data.tsframe['A'][-5:] = np.nan + ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + assert ray_df_equals_pandas( + ray_df.ffill(), + test_data.tsframe.ffill() + ) def test_fillna(): - ray_df = create_test_dataframe() + test_fillna_sanity() + test_fillna_downcast() + test_ffill() + test_ffill2() + test_bfill() + test_bfill2() + test_fillna_inplace() + # test_frame_fillna_limit() + # test_frame_pad_backfill_limit() + test_fillna_dtype_conversion() + test_fillna_skip_certain_blocks() + test_fillna_dict_series() + test_fillna_dataframe() + test_fillna_columns() + test_fillna_invalid_method() + test_fillna_invalid_value() + test_fillna_col_reordering() - with pytest.raises(NotImplementedError): - ray_df.fillna() + +@pytest.fixture +def test_fillna_sanity(num_partitions=2): + test_data = TestData() + tf = test_data.tsframe + tf.loc[tf.index[:5], 'A'] = np.nan + tf.loc[tf.index[-5:], 'A'] = np.nan + + zero_filled = test_data.tsframe.fillna(0) + ray_df = rdf.from_pandas(test_data.tsframe, num_partitions).fillna(0) + assert ray_df_equals_pandas(ray_df, zero_filled) + + padded = test_data.tsframe.fillna(method='pad') + ray_df = rdf.from_pandas(test_data.tsframe, + num_partitions).fillna(method='pad') + assert ray_df_equals_pandas(ray_df, padded) + + # mixed type + mf = test_data.mixed_frame + mf.loc[mf.index[5:20], 'foo'] = np.nan + mf.loc[mf.index[-10:], 'A'] = np.nan + + result = test_data.mixed_frame.fillna(value=0) + ray_df = rdf.from_pandas(test_data.mixed_frame, + num_partitions).fillna(value=0) + assert ray_df_equals_pandas(ray_df, result) + + result = test_data.mixed_frame.fillna(method='pad') + ray_df = rdf.from_pandas(test_data.mixed_frame, + num_partitions).fillna(method='pad') + assert ray_df_equals_pandas(ray_df, result) + + pytest.raises(ValueError, test_data.tsframe.fillna) + pytest.raises(ValueError, rdf.from_pandas(test_data.tsframe, + num_partitions).fillna) + with pytest.raises(ValueError): + rdf.from_pandas(test_data.tsframe, num_partitions).fillna( + 5, method='ffill' + ) + + # mixed numeric (but no float16) + mf = test_data.mixed_float.reindex(columns=['A', 'B', 'D']) + mf.loc[mf.index[-10:], 'A'] = np.nan + result = mf.fillna(value=0) + ray_df = rdf.from_pandas(mf, num_partitions).fillna(value=0) + assert ray_df_equals_pandas(ray_df, result) + + result = mf.fillna(method='pad') + ray_df = rdf.from_pandas(mf, num_partitions).fillna(method='pad') + assert ray_df_equals_pandas(ray_df, result) + + # TODO: Use this when Arrow issue resolves: + # (https://issues.apache.org/jira/browse/ARROW-2122) + # empty frame (GH #2778) + # df = DataFrame(columns=['x']) + # for m in ['pad', 'backfill']: + # df.x.fillna(method=m, inplace=True) + # df.x.fillna(method=m) + + # with different dtype (GH3386) + df = pd.DataFrame([['a', 'a', np.nan, 'a'], [ + 'b', 'b', np.nan, 'b'], ['c', 'c', np.nan, 'c']]) + + result = df.fillna({2: 'foo'}) + ray_df = rdf.from_pandas(df, num_partitions).fillna({2: 'foo'}) + assert ray_df_equals_pandas(ray_df, result) + + ray_df = rdf.from_pandas(df, num_partitions) + df.fillna({2: 'foo'}, inplace=True) + ray_df.fillna({2: 'foo'}, inplace=True) + assert ray_df_equals_pandas(ray_df, result) + + # limit and value + df = pd.DataFrame(np.random.randn(10, 3)) + df.iloc[2:7, 0] = np.nan + df.iloc[3:5, 2] = np.nan + + # result = df.fillna(999, limit=1) + # ray_df = rdf.from_pandas(df, num_partitions).fillna(999, limit=1) + + # assert ray_df_equals_pandas(ray_df, result) + + # with datelike + # GH 6344 + df = pd.DataFrame({ + 'Date': [pd.NaT, pd.Timestamp("2014-1-1")], + 'Date2': [pd.Timestamp("2013-1-1"), pd.NaT] + }) + result = df.fillna(value={'Date': df['Date2']}) + ray_df = rdf.from_pandas(df, num_partitions).fillna( + value={'Date': df['Date2']} + ) + assert ray_df_equals_pandas(ray_df, result) + + # TODO: Use this when Arrow issue resolves: + # (https://issues.apache.org/jira/browse/ARROW-2122) + # with timezone + # GH 15855 + """ + df = pd.DataFrame({'A': [pd.Timestamp('2012-11-11 00:00:00+01:00'), + pd.NaT]}) + ray_df = rdf.from_pandas(df, num_partitions) + assert ray_df_equals_pandas(ray_df.fillna(method='pad'), + df.fillna(method='pad')) + + df = pd.DataFrame({'A': [pd.NaT, + pd.Timestamp('2012-11-11 00:00:00+01:00')]}) + ray_df = rdf.from_pandas(df, num_partitions).fillna(method='bfill') + assert ray_df_equals_pandas(ray_df, df.fillna(method='bfill')) + """ + + +@pytest.fixture +def test_fillna_downcast(num_partitions=2): + # GH 15277 + # infer int64 from float64 + df = pd.DataFrame({'a': [1., np.nan]}) + result = df.fillna(0, downcast='infer') + ray_df = rdf.from_pandas(df, num_partitions).fillna(0, downcast='infer') + assert ray_df_equals_pandas(ray_df, result) + + # infer int64 from float64 when fillna value is a dict + df = pd.DataFrame({'a': [1., np.nan]}) + result = df.fillna({'a': 0}, downcast='infer') + ray_df = rdf.from_pandas(df, num_partitions).fillna( + {'a': 0}, downcast='infer' + ) + assert ray_df_equals_pandas(ray_df, result) + + +@pytest.fixture +def test_ffill2(num_partitions=2): + test_data = TestData() + test_data.tsframe['A'][:5] = np.nan + test_data.tsframe['A'][-5:] = np.nan + ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna(method='ffill'), + test_data.tsframe.fillna(method='ffill') + ) + + +@pytest.fixture +def test_bfill2(num_partitions=2): + test_data = TestData() + test_data.tsframe['A'][:5] = np.nan + test_data.tsframe['A'][-5:] = np.nan + ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna(method='bfill'), + test_data.tsframe.fillna(method='bfill') + ) + + +@pytest.fixture +def test_fillna_inplace(num_partitions=2): + df = pd.DataFrame(np.random.randn(10, 4)) + df[1][:4] = np.nan + df[3][-4:] = np.nan + + ray_df = rdf.from_pandas(df, num_partitions) + df.fillna(value=0, inplace=True) + assert not ray_df_equals_pandas(ray_df, df) + + ray_df.fillna(value=0, inplace=True) + assert ray_df_equals_pandas(ray_df, df) + + ray_df = rdf.from_pandas(df, num_partitions).fillna(value={0: 0}, + inplace=True) + assert ray_df is None + + df[1][:4] = np.nan + df[3][-4:] = np.nan + ray_df = rdf.from_pandas(df, num_partitions) + df.fillna(method='ffill', inplace=True) + + assert not ray_df_equals_pandas(ray_df, df) + + ray_df.fillna(method='ffill', inplace=True) + assert ray_df_equals_pandas(ray_df, df) + + +@pytest.fixture +def test_frame_fillna_limit(num_partitions=2): + index = np.arange(10) + df = pd.DataFrame(np.random.randn(10, 4), index=index) + + expected = df[:2].reindex(index) + expected = expected.fillna(method='pad', limit=5) + + ray_df = rdf.from_pandas(df[:2].reindex(index), num_partitions).fillna( + method='pad', limit=5 + ) + assert ray_df_equals_pandas(ray_df, expected) + + expected = df[-2:].reindex(index) + expected = expected.fillna(method='backfill', limit=5) + ray_df = rdf.from_pandas(df[-2:].reindex(index), num_partitions).fillna( + method='backfill', limit=5 + ) + assert ray_df_equals_pandas(ray_df, expected) + + +@pytest.fixture +def test_frame_pad_backfill_limit(num_partitions=2): + index = np.arange(10) + df = pd.DataFrame(np.random.randn(10, 4), index=index) + + result = df[:2].reindex(index) + ray_df = rdf.from_pandas(result, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna(method='pad', limit=5), + result.fillna(method='pad', limit=5) + ) + + result = df[-2:].reindex(index) + ray_df = rdf.from_pandas(result, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna(method='backfill', limit=5), + result.fillna(method='backfill', limit=5) + ) + + +@pytest.fixture +def test_fillna_dtype_conversion(num_partitions=2): + # make sure that fillna on an empty frame works + df = pd.DataFrame(index=["A", "B", "C"], columns=[1, 2, 3, 4, 5]) + + # empty block + df = pd.DataFrame(index=range(3), columns=['A', 'B'], dtype='float64') + ray_df = rdf.from_pandas(df, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna('nan'), + df.fillna('nan') + ) + + # equiv of replace + df = pd.DataFrame(dict(A=[1, np.nan], B=[1., 2.])) + ray_df = rdf.from_pandas(df, num_partitions) + for v in ['', 1, np.nan, 1.0]: + assert ray_df_equals_pandas( + ray_df.fillna(v), + df.fillna(v) + ) + + +@pytest.fixture +def test_fillna_skip_certain_blocks(num_partitions=2): + # don't try to fill boolean, int blocks + + df = pd.DataFrame(np.random.randn(10, 4).astype(int)) + ray_df = rdf.from_pandas(df, num_partitions) + + # it works! + assert ray_df_equals_pandas( + ray_df.fillna(np.nan), + df.fillna(np.nan) + ) + + +@pytest.fixture +def test_fillna_dict_series(num_partitions=2): + df = pd.DataFrame({'a': [np.nan, 1, 2, np.nan, np.nan], + 'b': [1, 2, 3, np.nan, np.nan], + 'c': [np.nan, 1, 2, 3, 4]}) + ray_df = rdf.from_pandas(df, num_partitions) + + assert ray_df_equals_pandas( + ray_df.fillna({'a': 0, 'b': 5}), + df.fillna({'a': 0, 'b': 5}) + ) + + # it works + assert ray_df_equals_pandas( + ray_df.fillna({'a': 0, 'b': 5, 'd': 7}), + df.fillna({'a': 0, 'b': 5, 'd': 7}) + ) + + # Series treated same as dict + assert ray_df_equals_pandas( + ray_df.fillna(df.max()), + df.fillna(df.max()) + ) + + +@pytest.fixture +def test_fillna_dataframe(num_partitions=2): + # GH 8377 + df = pd.DataFrame({'a': [np.nan, 1, 2, np.nan, np.nan], + 'b': [1, 2, 3, np.nan, np.nan], + 'c': [np.nan, 1, 2, 3, 4]}, + index=list('VWXYZ')) + ray_df = rdf.from_pandas(df, num_partitions) + + # df2 may have different index and columns + df2 = pd.DataFrame({'a': [np.nan, 10, 20, 30, 40], + 'b': [50, 60, 70, 80, 90], + 'foo': ['bar'] * 5}, + index=list('VWXuZ')) + + # only those columns and indices which are shared get filled + assert ray_df_equals_pandas( + ray_df.fillna(df2), + df.fillna(df2) + ) + + +@pytest.fixture +def test_fillna_columns(num_partitions=2): + df = pd.DataFrame(np.random.randn(10, 10)) + df.values[:, ::2] = np.nan + ray_df = rdf.from_pandas(df, num_partitions) + + assert ray_df_equals_pandas( + ray_df.fillna(method='ffill', axis=1), + df.fillna(method='ffill', axis=1) + ) + + df.insert(6, 'foo', 5) + ray_df = rdf.from_pandas(df, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna(method='ffill', axis=1), + df.fillna(method='ffill', axis=1) + ) + + +@pytest.fixture +def test_fillna_invalid_method(num_partitions=2): + test_data = TestData() + ray_df = rdf.from_pandas(test_data.frame, num_partitions) + with tm.assert_raises_regex(ValueError, 'ffil'): + ray_df.fillna(method='ffil') + + +@pytest.fixture +def test_fillna_invalid_value(num_partitions=2): + test_data = TestData() + ray_df = rdf.from_pandas(test_data.frame, num_partitions) + # list + pytest.raises(TypeError, ray_df.fillna, [1, 2]) + # tuple + pytest.raises(TypeError, ray_df.fillna, (1, 2)) + # TODO: Uncomment when iloc is implemented + # frame with series + # pytest.raises(ValueError, ray_df.iloc[:, 0].fillna, ray_df) + + +@pytest.fixture +def test_fillna_col_reordering(num_partitions=2): + cols = ["COL." + str(i) for i in range(5, 0, -1)] + data = np.random.rand(20, 5) + df = pd.DataFrame(index=range(20), columns=cols, data=data) + ray_df = rdf.from_pandas(df, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna(method='ffill'), + df.fillna(method='ffill') + ) + + +""" +TODO: Use this when Arrow issue resolves: +(https://issues.apache.org/jira/browse/ARROW-2122) +@pytest.fixture +def test_fillna_datetime_columns(num_partitions=2): + # GH 7095 + df = pd.DataFrame({'A': [-1, -2, np.nan], + 'B': date_range('20130101', periods=3), + 'C': ['foo', 'bar', None], + 'D': ['foo2', 'bar2', None]}, + index=date_range('20130110', periods=3)) + ray_df = rdf.from_pandas(df, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna('?'), + df.fillna('?') + ) + + df = pd.DataFrame({'A': [-1, -2, np.nan], + 'B': [pd.Timestamp('2013-01-01'), + pd.Timestamp('2013-01-02'), pd.NaT], + 'C': ['foo', 'bar', None], + 'D': ['foo2', 'bar2', None]}, + index=date_range('20130110', periods=3)) + ray_df = rdf.from_pandas(df, num_partitions) + assert ray_df_equals_pandas( + ray_df.fillna('?'), + df.fillna('?') + ) +""" def test_filter():