[DataFrame] Implement loc, iloc (#1612)

* Add parquet-cpp to gitignore

* Add read_csv and read_parquet

* Gitignore pytest_cache

* Fix flake8

* Add io to __init__

* Changing Index. Currently running tests, but so far untested.

* Removing issue of reassigning DF in from_pandas

* Fixing lint

* Fix bug

* Fix bug

* Fix bug

* Better performance

* Fixing index issue with sum

* Address comments

* Update io with index

* Updating performance and implementation. Adding tests

* Fixing off-by-1

* Fix lint

* Address Comments

* Make pop compatible with new to_pandas

* Format Code

* Cleanup some index issue

* Bug fix: assigned reset_index back

* Implement loc and iloc

* Revert whitespace

* Format code

* Address comments
This commit is contained in:
Simon Mo
2018-02-27 01:57:52 -08:00
committed by Devin Petersohn
parent b79597dc00
commit 4ab16d7fb3
3 changed files with 169 additions and 23 deletions
+18 -5
View File
@@ -1191,7 +1191,6 @@ class DataFrame(object):
A Series containing the popped values. Also modifies this
DataFrame.
"""
popped = to_pandas(self._map_partitions(
lambda df: df.pop(item)))
self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df
@@ -1959,8 +1958,15 @@ class DataFrame(object):
def __rsub__(other, axis=None, level=None, fill_value=None):
raise NotImplementedError("Not Yet implemented.")
def loc(axis=None):
raise NotImplementedError("Not Yet implemented.")
@property
def loc(self):
"""Purely label-location based indexer for selection by label.
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
from .indexing import _Loc_Indexer
return _Loc_Indexer(self)
@property
def is_copy(self):
@@ -1978,8 +1984,15 @@ class DataFrame(object):
def ix(axis=None):
raise NotImplementedError("Not Yet implemented.")
def iloc(axis=None):
raise NotImplementedError("Not Yet implemented.")
@property
def iloc(self):
"""Purely integer-location based indexing for selection by position.
We currently support: single label, list array, slice object
We do not support: boolean array, callable
"""
from .indexing import _iLoc_Indexer
return _iLoc_Indexer(self)
def _get_lengths(df):
+103
View File
@@ -0,0 +1,103 @@
import pandas as pd
import ray
from .dataframe import _deploy_func
class _Location_Indexer_Base():
"""Base class for location indexer like loc and iloc
This class abstract away commonly used method
"""
def __init__(self, ray_df):
self.df = ray_df
def __getitem__(self, key):
if not isinstance(key, tuple):
# The one argument case is equivalent to full slice in 2nd dim.
return self.locate_2d(key, slice(None))
else:
return self.locate_2d(*key)
def _get_lookup_dict(self, ray_partition_idx):
if ray_partition_idx.ndim == 1: # Single row matched
position = (ray_partition_idx['partition'],
ray_partition_idx['index_within_partition'])
rows_to_lookup = {position[0]: [position[1]]}
if ray_partition_idx.ndim == 2: # Multiple rows matched
# We copy ray_partition_idx because it allows us to
# do groupby. This might not be the most efficient method.
# And have room to optimize.
ray_partition_idx = ray_partition_idx.copy()
rows_to_lookup = ray_partition_idx.groupby('partition').aggregate(
lambda x: list(x)).to_dict()['index_within_partition']
return rows_to_lookup
def locate_2d(self, row_label, col_label):
pass
def _map_partition(self, lookup_dict, col_lst, indexer='loc'):
"""Apply retrieval function to a lookup_dict
in the form of {partition_id: [idx]}.
Returns:
retrieved_rows_remote: a list of object ids for pd_df
"""
assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc"
if indexer == 'loc':
def retrieve_func(df, idx_lst, col_label):
return df.loc[idx_lst, col_label]
elif indexer == 'iloc':
def retrieve_func(df, idx_lst, col_idx):
return df.iloc[idx_lst, col_idx]
retrieved_rows_remote = [
_deploy_func.remote(retrieve_func, self.df._df[partition],
idx_to_lookup, col_lst)
for partition, idx_to_lookup in lookup_dict.items()
]
return retrieved_rows_remote
class _Loc_Indexer(_Location_Indexer_Base):
"""A indexer for ray_df.loc[] functionality"""
def locate_2d(self, row_label, col_label):
index_loc = self.df._index.loc[row_label]
lookup_dict = self._get_lookup_dict(index_loc)
retrieved_rows_remote = self._map_partition(
lookup_dict, col_label, indexer='loc')
joined_df = pd.concat(ray.get(retrieved_rows_remote))
if index_loc.ndim == 2:
# The returned result need to be indexed series/df
# Re-index is needed.
joined_df.index = index_loc.index
if isinstance(row_label, int) or isinstance(row_label, str):
return joined_df.squeeze(axis=0)
else:
return joined_df
class _iLoc_Indexer(_Location_Indexer_Base):
"""A indexer for ray_df.iloc[] functionality"""
def locate_2d(self, row_idx, col_idx):
index_loc = self.df._index.iloc[row_idx]
lookup_dict = self._get_lookup_dict(index_loc)
retrieved_rows_remote = self._map_partition(
lookup_dict, col_idx, indexer='iloc')
joined_df = pd.concat(ray.get(retrieved_rows_remote))
if index_loc.ndim == 2:
# The returned result need to be indexed series/df
# Re-index is needed.
joined_df.index = index_loc.index
if isinstance(row_idx, int) or isinstance(row_idx, str):
return joined_df.squeeze(axis=0)
else:
return joined_df
+48 -18
View File
@@ -223,6 +223,9 @@ def test_int_dataframe():
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
test_loc(ray_df, pandas_df)
test_iloc(ray_df, pandas_df)
labels = ['a', 'b', 'c', 'd']
test_set_axis(ray_df, pandas_df, labels, 0)
test_set_axis(ray_df, pandas_df, labels, 'rows')
@@ -325,6 +328,9 @@ def test_float_dataframe():
test_iteritems(ray_df, pandas_df)
test_itertuples(ray_df, pandas_df)
test_loc(ray_df, pandas_df)
test_iloc(ray_df, pandas_df)
labels = ['a', 'b', 'c', 'd']
test_set_axis(ray_df, pandas_df, labels, 0)
test_set_axis(ray_df, pandas_df, labels, 'rows')
@@ -348,10 +354,10 @@ def test_float_dataframe():
def test_mixed_dtype_dataframe():
pandas_df = pd.DataFrame({
'col1': [1, 2, 3, 4],
'col2': [4, 5, 6, 7],
'col3': [8.0, 9.4, 10.1, 11.3],
'col4': ['a', 'b', 'c', 'd']})
'col1': [1, 2, 3, 4],
'col2': [4, 5, 6, 7],
'col3': [8.0, 9.4, 10.1, 11.3],
'col4': ['a', 'b', 'c', 'd']})
ray_df = rdf.from_pandas(pandas_df, 2)
@@ -432,6 +438,9 @@ def test_mixed_dtype_dataframe():
test_iteritems(ray_df, pandas_df)
test_itertuples(ray_df, pandas_df)
test_loc(ray_df, pandas_df)
test_iloc(ray_df, pandas_df)
labels = ['a', 'b', 'c', 'd']
test_set_axis(ray_df, pandas_df, labels, 0)
test_set_axis(ray_df, pandas_df, labels, 'rows')
@@ -454,10 +463,10 @@ def test_mixed_dtype_dataframe():
def test_nan_dataframe():
pandas_df = pd.DataFrame({
'col1': [1, 2, 3, np.nan],
'col2': [4, 5, np.nan, 7],
'col3': [8, np.nan, 10, 11],
'col4': [np.nan, 13, 14, 15]})
'col1': [1, 2, 3, np.nan],
'col2': [4, 5, np.nan, 7],
'col3': [8, np.nan, 10, 11],
'col4': [np.nan, 13, 14, 15]})
ray_df = rdf.from_pandas(pandas_df, 2)
@@ -528,6 +537,9 @@ def test_nan_dataframe():
test_iteritems(ray_df, pandas_df)
test_itertuples(ray_df, pandas_df)
test_loc(ray_df, pandas_df)
test_iloc(ray_df, pandas_df)
labels = ['a', 'b', 'c', 'd']
test_set_axis(ray_df, pandas_df, labels, 0)
test_set_axis(ray_df, pandas_df, labels, 'rows')
@@ -860,14 +872,14 @@ def test_eq():
def test_equals():
pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3],
'col2': [2, 3, 4, 1]})
'col2': [2, 3, 4, 1]})
ray_df1 = rdf.from_pandas(pandas_df1, 2)
ray_df2 = rdf.from_pandas(pandas_df1, 3)
assert ray_df1.equals(ray_df2)
pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3],
'col2': [2, 3, 5, 1]})
'col2': [2, 3, 5, 1]})
ray_df3 = rdf.from_pandas(pandas_df2, 4)
assert not ray_df3.equals(ray_df1)
@@ -2074,11 +2086,20 @@ def test___rsub__():
ray_df.__rsub__(None, None, None)
def test_loc():
ray_df = create_test_dataframe()
@pytest.fixture
def test_loc(ray_df, pd_df):
# Singleton
assert ray_df.loc[0].equals(pd_df.loc[0])
assert ray_df.loc[0, 'col1'] == pd_df.loc[0, 'col1']
with pytest.raises(NotImplementedError):
ray_df.loc()
# List
assert ray_df.loc[[1, 2]].equals(pd_df.loc[[1, 2]])
assert ray_df.loc[[1, 2], ['col1']].equals(pd_df.loc[[1, 2], ['col1']])
# Slice
assert ray_df.loc[1:, 'col1'].equals(pd_df.loc[1:, 'col1'])
assert ray_df.loc[1:2, 'col1'].equals(pd_df.loc[1:2, 'col1'])
assert ray_df.loc[1:2, 'col1':'col2'].equals(pd_df.loc[1:2, 'col1':'col2'])
def test_is_copy():
@@ -2116,8 +2137,17 @@ def test_ix():
ray_df.ix()
def test_iloc():
ray_df = create_test_dataframe()
@pytest.fixture
def test_iloc(ray_df, pd_df):
# Singleton
assert ray_df.iloc[0].equals(pd_df.iloc[0])
assert ray_df.iloc[0, 1] == pd_df.iloc[0, 1]
with pytest.raises(NotImplementedError):
ray_df.iloc()
# List
assert ray_df.iloc[[1, 2]].equals(pd_df.iloc[[1, 2]])
assert ray_df.iloc[[1, 2], [1, 0]].equals(pd_df.iloc[[1, 2], [1, 0]])
# Slice
assert ray_df.iloc[1:, 0].equals(pd_df.iloc[1:, 0])
assert ray_df.iloc[1:2, 0].equals(pd_df.iloc[1:2, 0])
assert ray_df.iloc[1:2, 0:2].equals(pd_df.iloc[1:2, 0:2])