[DataFrame] Added Implementations for equals, query, and some other operations (#1610)

* Implemented Dataframe __abs__ and __iter__

* implemented __neg__

* implemented query

* Implemented equals

* Implemented __eq__ and __ne__ operators

* Added method level comments

* resolved flake8 comments

* resolving devin's comments
This commit is contained in:
Kunal Gosar
2018-02-26 18:31:00 -08:00
committed by Devin Petersohn
parent d78a22f94c
commit 48bd7b147d
2 changed files with 170 additions and 30 deletions
+106 -9
View File
@@ -9,6 +9,10 @@ from pandas.core.index import _ensure_index_from_sequences
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas.compat import lzip
from pandas.core.dtypes.common import (
is_bool_dtype,
is_numeric_dtype,
is_timedelta64_dtype)
import warnings
import numpy as np
@@ -31,8 +35,7 @@ class DataFrame(object):
assert(len(df) > 0)
self._df = df
self._lengths = [_deploy_func.remote(_get_lengths, d)
for d in self._df]
self._compute_lengths()
self.columns = columns
# this _index object is a pd.DataFrame
@@ -80,6 +83,12 @@ class DataFrame(object):
index = property(_get_index, _set_index)
def _compute_lengths(self):
"""Updates the stored lengths of DataFrame partions
"""
self._lengths = [_deploy_func.remote(_get_lengths, d)
for d in self._df]
def _get_lengths(self):
"""Gets the lengths for each partition and caches it if it wasn't.
@@ -200,6 +209,21 @@ class DataFrame(object):
return DataFrame(new_df, self.columns, index=index)
def _update_inplace(self, df=None, columns=None, index=None):
"""Updates the current DataFrame inplace
"""
assert(len(df) > 0)
if df:
self._df = df
if columns:
self.columns = columns
if index:
self.index = index
self._compute_lengths()
self._index = self._default_index()
def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
@@ -630,7 +654,37 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.")
def equals(self, other):
raise NotImplementedError("Not Yet implemented.")
"""
Checks if other DataFrame is elementwise equal to the current one
Returns:
Boolean: True if equal, otherwise False
"""
def helper(df, index, other_series):
return df.iloc[index['index_within_partition']] \
.equals(other_series)
results = []
other_partition = None
other_df = None
for i, idx in other._index.iterrows():
if idx['partition'] != other_partition:
other_df = ray.get(other._df[idx['partition']])
other_partition = idx['partition']
# TODO: group series here into full df partitions to reduce
# the number of remote calls to helper
other_series = other_df.iloc[idx['index_within_partition']]
curr_index = self._index.iloc[i]
curr_df = self._df[int(curr_index['partition'])]
results.append(_deploy_func.remote(helper,
curr_df,
curr_index,
other_series))
for r in results:
if not ray.get(r):
return False
return True
def eval(self, expr, inplace=False, **kwargs):
raise NotImplementedError("Not Yet implemented.")
@@ -1154,7 +1208,18 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.")
def query(self, expr, inplace=False, **kwargs):
raise NotImplementedError("Not Yet implemented.")
"""Queries the Dataframe with a boolean expression
Returns:
A new DataFrame if inplace=False
"""
new_dfs = [_deploy_func.remote(lambda df: df.query(expr, **kwargs),
part) for part in self._df]
if inplace:
self._update_inplace(new_dfs)
else:
return DataFrame(new_dfs, self.columns)
def radd(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError("Not Yet implemented.")
@@ -1703,7 +1768,12 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.")
def __iter__(self):
raise NotImplementedError("Not Yet implemented.")
"""Iterate over the columns
Returns:
An Iterator over the columns of the dataframe.
"""
return iter(self.columns)
def __contains__(self, key):
return key in self.columns
@@ -1715,7 +1785,12 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.")
def __abs__(self):
raise NotImplementedError("Not Yet implemented.")
"""Creates a modified DataFrame by elementwise taking the absolute value
Returns:
A modified DataFrame
"""
return self.abs()
def __round__(self, decimals=0):
raise NotImplementedError("Not Yet implemented.")
@@ -1794,10 +1869,20 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.")
def __eq__(self, other):
raise NotImplementedError("Not Yet implemented.")
"""Computes the equality of this DataFrame with another
Returns:
True, if the DataFrames are equal. False otherwise.
"""
return self.equals(other)
def __ne__(self, other):
raise NotImplementedError("Not Yet implemented.")
"""Checks that this DataFrame is not equal to another
Returns:
True, if the DataFrames are not equal. False otherwise.
"""
return not self.equals(other)
def __add__(self, other):
raise NotImplementedError("Not Yet implemented.")
@@ -1824,7 +1909,19 @@ class DataFrame(object):
raise NotImplementedError("Not Yet implemented.")
def __neg__(self):
raise NotImplementedError("Not Yet implemented.")
"""Computes an element wise negative DataFrame
Returns:
A modified DataFrame where every element is the negation of before
"""
for t in self.dtypes:
if not (is_bool_dtype(t)
or is_numeric_dtype(t)
or is_timedelta64_dtype(t)):
raise TypeError("Unary negative expects numeric dtype, not {}"
.format(t))
return self._map_partitions(lambda df: df.__neg__())
def __floordiv__(self, other):
raise NotImplementedError("Not Yet implemented.")
+64 -21
View File
@@ -160,6 +160,9 @@ def test_int_dataframe():
lambda x: x,
lambda x: False]
query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2',
'(col2 > col1) and (col1 < col3)']
keys = ['col1',
'col2',
'col3',
@@ -185,10 +188,14 @@ def test_int_dataframe():
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_query(ray_df, pandas_df, query_funcs)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___neg__(ray_df, pandas_df)
test___iter__(ray_df, pandas_df)
test___abs__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
@@ -256,6 +263,9 @@ def test_float_dataframe():
lambda x: x,
lambda x: False]
query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2',
'(col2 > col1) and (col1 < col3)']
keys = ['col1',
'col2',
'col3',
@@ -281,10 +291,14 @@ def test_float_dataframe():
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_query(ray_df, pandas_df, query_funcs)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___neg__(ray_df, pandas_df)
test___iter__(ray_df, pandas_df)
test___abs__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
@@ -346,6 +360,9 @@ def test_mixed_dtype_dataframe():
lambda x: x,
lambda x: False]
query_funcs = ['col1 < col2', 'col1 == col2',
'(col2 > col1) and (col1 < col3)']
keys = ['col1',
'col2',
'col3',
@@ -370,14 +387,21 @@ def test_mixed_dtype_dataframe():
with pytest.raises(TypeError):
test_abs(ray_df, pandas_df)
test___abs__(ray_df, pandas_df)
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_query(ray_df, pandas_df, query_funcs)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
with pytest.raises(TypeError):
test___neg__(ray_df, pandas_df)
test___iter__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
@@ -442,6 +466,9 @@ def test_nan_dataframe():
lambda x: x,
lambda x: False]
query_funcs = ['col1 < col2', 'col3 > col4', 'col1 == col2',
'(col2 > col1) and (col1 < col3)']
keys = ['col1',
'col2',
'col3',
@@ -467,10 +494,14 @@ def test_nan_dataframe():
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_query(ray_df, pandas_df, query_funcs)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___neg__(ray_df, pandas_df)
test___iter__(ray_df, pandas_df)
test___abs__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
@@ -828,10 +859,19 @@ def test_eq():
def test_equals():
ray_df = create_test_dataframe()
pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3],
'col2': [2, 3, 4, 1]})
ray_df1 = rdf.from_pandas(pandas_df1, 2)
ray_df2 = rdf.from_pandas(pandas_df1, 3)
with pytest.raises(NotImplementedError):
ray_df.equals(None)
assert ray_df1.equals(ray_df2)
pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3],
'col2': [2, 3, 5, 1]})
ray_df3 = rdf.from_pandas(pandas_df2, 4)
assert not ray_df3.equals(ray_df1)
assert not ray_df3.equals(ray_df2)
def test_eval():
@@ -1306,11 +1346,12 @@ def test_quantile():
ray_df.quantile()
def test_query():
ray_df = create_test_dataframe()
@pytest.fixture
def test_query(ray_df, pandas_df, funcs):
with pytest.raises(NotImplementedError):
ray_df.query(None)
for f in funcs:
pandas_df_new, ray_df_new = pandas_df.query(f), ray_df.query(f)
assert pandas_df_new.equals(rdf.to_pandas(ray_df_new))
def test_radd():
@@ -1885,11 +1926,10 @@ def test___unicode__():
ray_df.__unicode__()
def test___neg__():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.__neg__()
@pytest.fixture
def test___neg__(ray_df, pd_df):
ray_df_neg = ray_df.__neg__()
assert pd_df.__neg__().equals(rdf.to_pandas(ray_df_neg))
def test___invert__():
@@ -1906,11 +1946,16 @@ def test___hash__():
ray_df.__hash__()
def test___iter__():
ray_df = create_test_dataframe()
@pytest.fixture
def test___iter__(ray_df, pd_df):
ray_iterator = ray_df.__iter__()
with pytest.raises(NotImplementedError):
ray_df.__iter__()
# Check that ray_iterator implements the iterator interface
assert hasattr(ray_iterator, '__iter__')
assert hasattr(ray_iterator, 'next') or hasattr(ray_iterator, '__next__')
pd_iterator = pd_df.__iter__()
assert list(ray_iterator) == list(pd_iterator)
@pytest.fixture
@@ -1933,11 +1978,9 @@ def test___bool__():
ray_df.__bool__()
def test___abs__():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.__abs__()
@pytest.fixture
def test___abs__(ray_df, pandas_df):
assert(ray_df_equals_pandas(abs(ray_df), abs(pandas_df)))
def test___round__():