diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index be1d3b9f4..113d41510 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -9,6 +9,10 @@ from pandas.core.index import _ensure_index_from_sequences from pandas._libs import lib from pandas.core.dtypes.cast import maybe_upcast_putmask from pandas.compat import lzip +from pandas.core.dtypes.common import ( + is_bool_dtype, + is_numeric_dtype, + is_timedelta64_dtype) import warnings import numpy as np @@ -31,8 +35,7 @@ class DataFrame(object): assert(len(df) > 0) self._df = df - self._lengths = [_deploy_func.remote(_get_lengths, d) - for d in self._df] + self._compute_lengths() self.columns = columns # this _index object is a pd.DataFrame @@ -80,6 +83,12 @@ class DataFrame(object): index = property(_get_index, _set_index) + def _compute_lengths(self): + """Updates the stored lengths of DataFrame partions + """ + self._lengths = [_deploy_func.remote(_get_lengths, d) + for d in self._df] + def _get_lengths(self): """Gets the lengths for each partition and caches it if it wasn't. @@ -200,6 +209,21 @@ class DataFrame(object): return DataFrame(new_df, self.columns, index=index) + def _update_inplace(self, df=None, columns=None, index=None): + """Updates the current DataFrame inplace + """ + assert(len(df) > 0) + + if df: + self._df = df + if columns: + self.columns = columns + if index: + self.index = index + + self._compute_lengths() + self._index = self._default_index() + def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -630,7 +654,37 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def equals(self, other): - raise NotImplementedError("Not Yet implemented.") + """ + Checks if other DataFrame is elementwise equal to the current one + + Returns: + Boolean: True if equal, otherwise False + """ + def helper(df, index, other_series): + return df.iloc[index['index_within_partition']] \ + .equals(other_series) + + results = [] + other_partition = None + other_df = None + for i, idx in other._index.iterrows(): + if idx['partition'] != other_partition: + other_df = ray.get(other._df[idx['partition']]) + other_partition = idx['partition'] + # TODO: group series here into full df partitions to reduce + # the number of remote calls to helper + other_series = other_df.iloc[idx['index_within_partition']] + curr_index = self._index.iloc[i] + curr_df = self._df[int(curr_index['partition'])] + results.append(_deploy_func.remote(helper, + curr_df, + curr_index, + other_series)) + + for r in results: + if not ray.get(r): + return False + return True def eval(self, expr, inplace=False, **kwargs): raise NotImplementedError("Not Yet implemented.") @@ -1154,7 +1208,18 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def query(self, expr, inplace=False, **kwargs): - raise NotImplementedError("Not Yet implemented.") + """Queries the Dataframe with a boolean expression + + Returns: + A new DataFrame if inplace=False + """ + new_dfs = [_deploy_func.remote(lambda df: df.query(expr, **kwargs), + part) for part in self._df] + + if inplace: + self._update_inplace(new_dfs) + else: + return DataFrame(new_dfs, self.columns) def radd(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1703,7 +1768,12 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __iter__(self): - raise NotImplementedError("Not Yet implemented.") + """Iterate over the columns + + Returns: + An Iterator over the columns of the dataframe. + """ + return iter(self.columns) def __contains__(self, key): return key in self.columns @@ -1715,7 +1785,12 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __abs__(self): - raise NotImplementedError("Not Yet implemented.") + """Creates a modified DataFrame by elementwise taking the absolute value + + Returns: + A modified DataFrame + """ + return self.abs() def __round__(self, decimals=0): raise NotImplementedError("Not Yet implemented.") @@ -1794,10 +1869,20 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __eq__(self, other): - raise NotImplementedError("Not Yet implemented.") + """Computes the equality of this DataFrame with another + + Returns: + True, if the DataFrames are equal. False otherwise. + """ + return self.equals(other) def __ne__(self, other): - raise NotImplementedError("Not Yet implemented.") + """Checks that this DataFrame is not equal to another + + Returns: + True, if the DataFrames are not equal. False otherwise. + """ + return not self.equals(other) def __add__(self, other): raise NotImplementedError("Not Yet implemented.") @@ -1824,7 +1909,19 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __neg__(self): - raise NotImplementedError("Not Yet implemented.") + """Computes an element wise negative DataFrame + + Returns: + A modified DataFrame where every element is the negation of before + """ + for t in self.dtypes: + if not (is_bool_dtype(t) + or is_numeric_dtype(t) + or is_timedelta64_dtype(t)): + raise TypeError("Unary negative expects numeric dtype, not {}" + .format(t)) + + return self._map_partitions(lambda df: df.__neg__()) def __floordiv__(self, other): raise NotImplementedError("Not Yet implemented.") diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 7dd63059b..741259e3b 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -160,6 +160,9 @@ def test_int_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -185,10 +188,14 @@ def test_int_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -256,6 +263,9 @@ def test_float_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -281,10 +291,14 @@ def test_float_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -346,6 +360,9 @@ def test_mixed_dtype_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -370,14 +387,21 @@ def test_mixed_dtype_dataframe(): with pytest.raises(TypeError): test_abs(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + + with pytest.raises(TypeError): + test___neg__(ray_df, pandas_df) + + test___iter__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -442,6 +466,9 @@ def test_nan_dataframe(): lambda x: x, lambda x: False] + query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2', + '(col2 > col1) and (col1 < col3)'] + keys = ['col1', 'col2', 'col3', @@ -467,10 +494,14 @@ def test_nan_dataframe(): test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) test_round(ray_df, pandas_df) + test_query(ray_df, pandas_df, query_funcs) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) test___getitem__(ray_df, pandas_df) + test___neg__(ray_df, pandas_df) + test___iter__(ray_df, pandas_df) + test___abs__(ray_df, pandas_df) test___delitem__(ray_df, pandas_df) test___copy__(ray_df, pandas_df) test___deepcopy__(ray_df, pandas_df) @@ -828,10 +859,19 @@ def test_eq(): def test_equals(): - ray_df = create_test_dataframe() + pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3], + 'col2': [2, 3, 4, 1]}) + ray_df1 = rdf.from_pandas(pandas_df1, 2) + ray_df2 = rdf.from_pandas(pandas_df1, 3) - with pytest.raises(NotImplementedError): - ray_df.equals(None) + assert ray_df1.equals(ray_df2) + + pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3], + 'col2': [2, 3, 5, 1]}) + ray_df3 = rdf.from_pandas(pandas_df2, 4) + + assert not ray_df3.equals(ray_df1) + assert not ray_df3.equals(ray_df2) def test_eval(): @@ -1306,11 +1346,12 @@ def test_quantile(): ray_df.quantile() -def test_query(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_query(ray_df, pandas_df, funcs): - with pytest.raises(NotImplementedError): - ray_df.query(None) + for f in funcs: + pandas_df_new, ray_df_new = pandas_df.query(f), ray_df.query(f) + assert pandas_df_new.equals(rdf.to_pandas(ray_df_new)) def test_radd(): @@ -1885,11 +1926,10 @@ def test___unicode__(): ray_df.__unicode__() -def test___neg__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__neg__() +@pytest.fixture +def test___neg__(ray_df, pd_df): + ray_df_neg = ray_df.__neg__() + assert pd_df.__neg__().equals(rdf.to_pandas(ray_df_neg)) def test___invert__(): @@ -1906,11 +1946,16 @@ def test___hash__(): ray_df.__hash__() -def test___iter__(): - ray_df = create_test_dataframe() +@pytest.fixture +def test___iter__(ray_df, pd_df): + ray_iterator = ray_df.__iter__() - with pytest.raises(NotImplementedError): - ray_df.__iter__() + # Check that ray_iterator implements the iterator interface + assert hasattr(ray_iterator, '__iter__') + assert hasattr(ray_iterator, 'next') or hasattr(ray_iterator, '__next__') + + pd_iterator = pd_df.__iter__() + assert list(ray_iterator) == list(pd_iterator) @pytest.fixture @@ -1933,11 +1978,9 @@ def test___bool__(): ray_df.__bool__() -def test___abs__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__abs__() +@pytest.fixture +def test___abs__(ray_df, pandas_df): + assert(ray_df_equals_pandas(abs(ray_df), abs(pandas_df))) def test___round__():