From 4ab16d7fb32f63559b9b58ccb07e06df392f9ae0 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 27 Feb 2018 01:57:52 -0800 Subject: [PATCH] [DataFrame] Implement loc, iloc (#1612) * Add parquet-cpp to gitignore * Add read_csv and read_parquet * Gitignore pytest_cache * Fix flake8 * Add io to __init__ * Changing Index. Currently running tests, but so far untested. * Removing issue of reassigning DF in from_pandas * Fixing lint * Fix bug * Fix bug * Fix bug * Better performance * Fixing index issue with sum * Address comments * Update io with index * Updating performance and implementation. Adding tests * Fixing off-by-1 * Fix lint * Address Comments * Make pop compatible with new to_pandas * Format Code * Cleanup some index issue * Bug fix: assigned reset_index back * Implement loc and iloc * Revert whitespace * Format code * Address comments --- python/ray/dataframe/dataframe.py | 23 ++++- python/ray/dataframe/indexing.py | 103 ++++++++++++++++++++ python/ray/dataframe/test/test_dataframe.py | 66 +++++++++---- 3 files changed, 169 insertions(+), 23 deletions(-) create mode 100644 python/ray/dataframe/indexing.py diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 7ecf89a84..d3fb4c98f 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -1191,7 +1191,6 @@ class DataFrame(object): A Series containing the popped values. Also modifies this DataFrame. """ - popped = to_pandas(self._map_partitions( lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df @@ -1959,8 +1958,15 @@ class DataFrame(object): def __rsub__(other, axis=None, level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") - def loc(axis=None): - raise NotImplementedError("Not Yet implemented.") + @property + def loc(self): + """Purely label-location based indexer for selection by label. + + We currently support: single label, list array, slice object + We do not support: boolean array, callable + """ + from .indexing import _Loc_Indexer + return _Loc_Indexer(self) @property def is_copy(self): @@ -1978,8 +1984,15 @@ class DataFrame(object): def ix(axis=None): raise NotImplementedError("Not Yet implemented.") - def iloc(axis=None): - raise NotImplementedError("Not Yet implemented.") + @property + def iloc(self): + """Purely integer-location based indexing for selection by position. + + We currently support: single label, list array, slice object + We do not support: boolean array, callable + """ + from .indexing import _iLoc_Indexer + return _iLoc_Indexer(self) def _get_lengths(df): diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py new file mode 100644 index 000000000..aa94fd565 --- /dev/null +++ b/python/ray/dataframe/indexing.py @@ -0,0 +1,103 @@ +import pandas as pd +import ray +from .dataframe import _deploy_func + + +class _Location_Indexer_Base(): + """Base class for location indexer like loc and iloc + This class abstract away commonly used method + """ + + def __init__(self, ray_df): + self.df = ray_df + + def __getitem__(self, key): + if not isinstance(key, tuple): + # The one argument case is equivalent to full slice in 2nd dim. + return self.locate_2d(key, slice(None)) + else: + return self.locate_2d(*key) + + def _get_lookup_dict(self, ray_partition_idx): + if ray_partition_idx.ndim == 1: # Single row matched + position = (ray_partition_idx['partition'], + ray_partition_idx['index_within_partition']) + rows_to_lookup = {position[0]: [position[1]]} + if ray_partition_idx.ndim == 2: # Multiple rows matched + # We copy ray_partition_idx because it allows us to + # do groupby. This might not be the most efficient method. + # And have room to optimize. + ray_partition_idx = ray_partition_idx.copy() + rows_to_lookup = ray_partition_idx.groupby('partition').aggregate( + lambda x: list(x)).to_dict()['index_within_partition'] + return rows_to_lookup + + def locate_2d(self, row_label, col_label): + pass + + def _map_partition(self, lookup_dict, col_lst, indexer='loc'): + """Apply retrieval function to a lookup_dict + in the form of {partition_id: [idx]}. + + Returns: + retrieved_rows_remote: a list of object ids for pd_df + """ + assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc" + + if indexer == 'loc': + + def retrieve_func(df, idx_lst, col_label): + return df.loc[idx_lst, col_label] + elif indexer == 'iloc': + + def retrieve_func(df, idx_lst, col_idx): + return df.iloc[idx_lst, col_idx] + + retrieved_rows_remote = [ + _deploy_func.remote(retrieve_func, self.df._df[partition], + idx_to_lookup, col_lst) + for partition, idx_to_lookup in lookup_dict.items() + ] + return retrieved_rows_remote + + +class _Loc_Indexer(_Location_Indexer_Base): + """A indexer for ray_df.loc[] functionality""" + + def locate_2d(self, row_label, col_label): + index_loc = self.df._index.loc[row_label] + lookup_dict = self._get_lookup_dict(index_loc) + retrieved_rows_remote = self._map_partition( + lookup_dict, col_label, indexer='loc') + joined_df = pd.concat(ray.get(retrieved_rows_remote)) + + if index_loc.ndim == 2: + # The returned result need to be indexed series/df + # Re-index is needed. + joined_df.index = index_loc.index + + if isinstance(row_label, int) or isinstance(row_label, str): + return joined_df.squeeze(axis=0) + else: + return joined_df + + +class _iLoc_Indexer(_Location_Indexer_Base): + """A indexer for ray_df.iloc[] functionality""" + + def locate_2d(self, row_idx, col_idx): + index_loc = self.df._index.iloc[row_idx] + lookup_dict = self._get_lookup_dict(index_loc) + retrieved_rows_remote = self._map_partition( + lookup_dict, col_idx, indexer='iloc') + joined_df = pd.concat(ray.get(retrieved_rows_remote)) + + if index_loc.ndim == 2: + # The returned result need to be indexed series/df + # Re-index is needed. + joined_df.index = index_loc.index + + if isinstance(row_idx, int) or isinstance(row_idx, str): + return joined_df.squeeze(axis=0) + else: + return joined_df diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 741259e3b..31087066a 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -223,6 +223,9 @@ def test_int_dataframe(): test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] test_set_axis(ray_df, pandas_df, labels, 0) test_set_axis(ray_df, pandas_df, labels, 'rows') @@ -325,6 +328,9 @@ def test_float_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] test_set_axis(ray_df, pandas_df, labels, 0) test_set_axis(ray_df, pandas_df, labels, 'rows') @@ -348,10 +354,10 @@ def test_float_dataframe(): def test_mixed_dtype_dataframe(): pandas_df = pd.DataFrame({ - 'col1': [1, 2, 3, 4], - 'col2': [4, 5, 6, 7], - 'col3': [8.0, 9.4, 10.1, 11.3], - 'col4': ['a', 'b', 'c', 'd']}) + 'col1': [1, 2, 3, 4], + 'col2': [4, 5, 6, 7], + 'col3': [8.0, 9.4, 10.1, 11.3], + 'col4': ['a', 'b', 'c', 'd']}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -432,6 +438,9 @@ def test_mixed_dtype_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] test_set_axis(ray_df, pandas_df, labels, 0) test_set_axis(ray_df, pandas_df, labels, 'rows') @@ -454,10 +463,10 @@ def test_mixed_dtype_dataframe(): def test_nan_dataframe(): pandas_df = pd.DataFrame({ - 'col1': [1, 2, 3, np.nan], - 'col2': [4, 5, np.nan, 7], - 'col3': [8, np.nan, 10, 11], - 'col4': [np.nan, 13, 14, 15]}) + 'col1': [1, 2, 3, np.nan], + 'col2': [4, 5, np.nan, 7], + 'col3': [8, np.nan, 10, 11], + 'col4': [np.nan, 13, 14, 15]}) ray_df = rdf.from_pandas(pandas_df, 2) @@ -528,6 +537,9 @@ def test_nan_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + test_loc(ray_df, pandas_df) + test_iloc(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] test_set_axis(ray_df, pandas_df, labels, 0) test_set_axis(ray_df, pandas_df, labels, 'rows') @@ -860,14 +872,14 @@ def test_eq(): def test_equals(): pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3], - 'col2': [2, 3, 4, 1]}) + 'col2': [2, 3, 4, 1]}) ray_df1 = rdf.from_pandas(pandas_df1, 2) ray_df2 = rdf.from_pandas(pandas_df1, 3) assert ray_df1.equals(ray_df2) pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3], - 'col2': [2, 3, 5, 1]}) + 'col2': [2, 3, 5, 1]}) ray_df3 = rdf.from_pandas(pandas_df2, 4) assert not ray_df3.equals(ray_df1) @@ -2074,11 +2086,20 @@ def test___rsub__(): ray_df.__rsub__(None, None, None) -def test_loc(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_loc(ray_df, pd_df): + # Singleton + assert ray_df.loc[0].equals(pd_df.loc[0]) + assert ray_df.loc[0, 'col1'] == pd_df.loc[0, 'col1'] - with pytest.raises(NotImplementedError): - ray_df.loc() + # List + assert ray_df.loc[[1, 2]].equals(pd_df.loc[[1, 2]]) + assert ray_df.loc[[1, 2], ['col1']].equals(pd_df.loc[[1, 2], ['col1']]) + + # Slice + assert ray_df.loc[1:, 'col1'].equals(pd_df.loc[1:, 'col1']) + assert ray_df.loc[1:2, 'col1'].equals(pd_df.loc[1:2, 'col1']) + assert ray_df.loc[1:2, 'col1':'col2'].equals(pd_df.loc[1:2, 'col1':'col2']) def test_is_copy(): @@ -2116,8 +2137,17 @@ def test_ix(): ray_df.ix() -def test_iloc(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_iloc(ray_df, pd_df): + # Singleton + assert ray_df.iloc[0].equals(pd_df.iloc[0]) + assert ray_df.iloc[0, 1] == pd_df.iloc[0, 1] - with pytest.raises(NotImplementedError): - ray_df.iloc() + # List + assert ray_df.iloc[[1, 2]].equals(pd_df.iloc[[1, 2]]) + assert ray_df.iloc[[1, 2], [1, 0]].equals(pd_df.iloc[[1, 2], [1, 0]]) + + # Slice + assert ray_df.iloc[1:, 0].equals(pd_df.iloc[1:, 0]) + assert ray_df.iloc[1:2, 0].equals(pd_df.iloc[1:2, 0]) + assert ray_df.iloc[1:2, 0:2].equals(pd_df.iloc[1:2, 0:2])