diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index dbcfa311b..41924d801 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -5027,9 +5027,8 @@ class DataFrame(object): We currently support: single label, list array, slice object We do not support: boolean array, callable """ - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + from .indexing import _Loc_Indexer + return _Loc_Indexer(self) @property def is_copy(self): @@ -5054,9 +5053,8 @@ class DataFrame(object): We currently support: single label, list array, slice object We do not support: boolean array, callable """ - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + from .indexing import _iLoc_Indexer + return _iLoc_Indexer(self) def _copartition(self, other, new_index): """Colocates the values of other with this for certain operations. diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 4e86e05b1..11c23d885 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -382,3 +382,13 @@ class _IndexMetadata(object): key: slice to convert and check """ return convert_to_index_sliceable(self._coord_df, key) + + def get_partition(self, partition_id): + """Return a view of coord_df where partition = partition_id + """ + return self._coord_df[self._coord_df.partition == partition_id] + + def sorted_index(self): + return (self._coord_df + .sort_values(['partition', 'index_within_partition']) + .index) diff --git a/python/ray/dataframe/indexing.py b/python/ray/dataframe/indexing.py index cba4ff872..9df3cdffb 100644 --- a/python/ray/dataframe/indexing.py +++ b/python/ray/dataframe/indexing.py @@ -1,104 +1,458 @@ +"""Indexing Helper Class works as follows: + +_Location_Indexer_Base provide methods framework for __getitem__ + and __setitem__ that work with Ray DataFrame's internal index. Base + class's __{get,set}item__ takes in partitions & idx_in_partition data + and perform lookup/item write. + +_LocIndexer and _iLocIndexer is responsible for indexer specific logic and + lookup computation. Loc will take care of enlarge dataframe. Both indexer + will take care of translating pandas's lookup to Ray DataFrame's internal + lookup. + +An illustration is available at +https://github.com/ray-project/ray/pull/1955#issuecomment-386781826 +""" import pandas as pd +import numpy as np import ray -from .dataframe import _deploy_func +from warnings import warn + +from pandas.api.types import (is_scalar, is_list_like, is_bool) +from pandas.core.dtypes.common import is_integer +from pandas.core.indexing import IndexingError + +from .utils import (_blocks_to_col, _get_nan_block_id, extractor, + _mask_block_partitions, writer) +from .index_metadata import _IndexMetadata +from .dataframe import DataFrame + + +def is_slice(x): return isinstance(x, slice) + + +def is_2d(x): return is_list_like(x) or is_slice(x) + + +def is_tuple(x): return isinstance(x, tuple) + + +def is_boolean_array(x): return is_list_like(x) and all(map(is_bool, x)) + + +def is_integer_slice(x): + if not is_slice(x): + return False + for pos in [x.start, x.stop, x.step]: + if not ((pos is None) or is_integer(pos)): + return False # one position is neither None nor int + return True + + +_ENLARGEMENT_WARNING = """ +Passing list-likes to .loc or [] with any missing label will raise +KeyError in the future, you can use .reindex() as an alternative. + +See the documentation here: +http://pandas.pydata.org/pandas-docs/stable/indexing.html#deprecate-loc-reindex-listlike +""" + +_ILOC_INT_ONLY_ERROR = """ +Location based indexing can only have [integer, integer slice (START point is +INCLUDED, END point is EXCLUDED), listlike of integers, boolean array] types. +""" + + +def _parse_tuple(tup): + """Unpack the user input for getitem and setitem and compute ndim + + loc[a] -> ([a], :), 1D + loc[[a,b],] -> ([a,b], :), + loc[a,b] -> ([a], [b]), 0D + """ + row_loc, col_loc = slice(None), slice(None) + + if is_tuple(tup): + row_loc = tup[0] + if len(tup) == 2: + col_loc = tup[1] + if len(tup) > 2: + raise IndexingError('Too many indexers') + else: + row_loc = tup + + ndim = _compute_ndim(row_loc, col_loc) + row_loc = [row_loc] if is_scalar(row_loc) else row_loc + col_loc = [col_loc] if is_scalar(col_loc) else col_loc + + return row_loc, col_loc, ndim + + +def _is_enlargement(locator, coord_df): + """Determine if a locator will enlarge the corrd_df. + + Enlargement happens when you trying to locate using labels isn't in the + original index. In other words, enlargement == adding NaNs ! + """ + if is_list_like(locator) and not is_slice( + locator) and len(locator) > 0 and not is_boolean_array(locator): + n_diff_elems = len(pd.Index(locator).difference(coord_df.index)) + is_enlargement_boolean = n_diff_elems > 0 + return is_enlargement_boolean + return False + + +def _warn_enlargement(): + warn(FutureWarning(_ENLARGEMENT_WARNING)) + + +def _compute_ndim(row_loc, col_loc): + """Compute the ndim of result from locators + """ + row_scaler = is_scalar(row_loc) + col_scaler = is_scalar(col_loc) + + if row_scaler and col_scaler: + ndim = 0 + elif row_scaler ^ col_scaler: + ndim = 1 + else: + ndim = 2 + + return ndim class _Location_Indexer_Base(): """Base class for location indexer like loc and iloc - This class abstract away commonly used method """ def __init__(self, ray_df): self.df = ray_df + self.col_coord_df = ray_df._col_metadata._coord_df + self.row_coord_df = ray_df._row_metadata._coord_df + self.block_oids = ray_df._block_partitions - def __getitem__(self, key): - if not isinstance(key, tuple): - # The one argument case is equivalent to full slice in 2nd dim. - return self.locate_2d(key, slice(None)) - else: - return self.locate_2d(*key) + self.is_view = False + if isinstance(ray_df, DataFrameView): + self.block_oids = ray_df._block_partitions_data + self.is_view = True - def _get_lookup_dict(self, ray_partition_idx): - if ray_partition_idx.ndim == 1: # Single row matched - position = (ray_partition_idx['partition'], - ray_partition_idx['index_within_partition']) - rows_to_lookup = {position[0]: [position[1]]} - if ray_partition_idx.ndim == 2: # Multiple rows matched - # We copy ray_partition_idx because it allows us to - # do groupby. This might not be the most efficient method. - # And have room to optimize. - ray_partition_idx = ray_partition_idx.copy() - rows_to_lookup = ray_partition_idx.groupby('partition').aggregate( - lambda x: list(x)).to_dict()['index_within_partition'] - return rows_to_lookup - - def locate_2d(self, row_label, col_label): - pass - - def _map_partition(self, lookup_dict, col_lst, indexer='loc'): - """Apply retrieval function to a lookup_dict - in the form of {partition_id: [idx]}. - - Returns: - retrieved_rows_remote: a list of object ids for pd_df + def __getitem__(self, row_lookup, col_lookup, ndim): """ - assert indexer in ['loc', 'iloc'], "indexer must be loc or iloc" + Args: + row_lookup: A pd dataframe, a partial view from row_coord_df + col_lookup: A pd dataframe, a partial view from col_coord_df + ndim: the dimension of returned data + """ + if ndim == 2: + return self._generate_view(row_lookup, col_lookup) - if indexer == 'loc': + extracted = self._retrive_items(row_lookup, col_lookup) + if ndim == 1: + result = ray.get(_blocks_to_col.remote(*extracted)).squeeze() - def retrieve_func(df, idx_lst, col_label): - return df.loc[idx_lst, col_label] - elif indexer == 'iloc': + if is_scalar(result): + result = pd.Series(result) - def retrieve_func(df, idx_lst, col_idx): - return df.iloc[idx_lst, col_idx] + scaler_axis = row_lookup if len(row_lookup) == 1 else col_lookup + series_name = scaler_axis.iloc[0].name + result.name = series_name - retrieved_rows_remote = [ - _deploy_func.remote(retrieve_func, - self.df._row_partitions[partition], - idx_to_lookup, col_lst) - for partition, idx_to_lookup in lookup_dict.items() - ] - return retrieved_rows_remote + index_axis = row_lookup if len(col_lookup) == 1 else col_lookup + result.index = index_axis.index + + if ndim == 0: + result = ray.get(extracted[0]).squeeze() + + return result + + def _retrive_items(self, row_lookup, col_lookup): + """Given lookup dataframes, return a list of result oids + """ + result_oids = [] + + # We have to copy before we groupby because + # https://github.com/pandas-dev/pandas/issues/10043 + row_groups = row_lookup.copy().groupby('partition') + col_groups = col_lookup.copy().groupby('partition') + for row_blk, row_data in row_groups: + for col_blk, col_data in col_groups: + block_oid = self.block_oids[row_blk, col_blk] + row_idx = row_data['index_within_partition'] + col_idx = col_data['index_within_partition'] + + result_oid = extractor.remote(block_oid, row_idx, col_idx) + result_oids.append(result_oid) + return result_oids + + def _generate_view(self, row_lookup, col_lookup): + """Generate a DataFrameView from lookup + """ + row_metadata_view = _IndexMetadata( + _coord_df=row_lookup, _lengths=self.df._row_metadata._lengths) + + col_metadata_view = _IndexMetadata( + _coord_df=col_lookup, _lengths=self.df._col_metadata._lengths) + + df_view = DataFrameView( + block_partitions=self.block_oids, + row_metadata=row_metadata_view, + col_metadata=col_metadata_view, + index=row_metadata_view.index, + columns=col_metadata_view.index) + + return df_view + + def __setitem__(self, row_lookup, col_lookup, item): + """ + Args: + row_lookup: A pd dataframe, a partial view from row_coord_df + col_lookup: A pd dataframe, a partial view from col_coord_df + item: The new item needs to be set. It can be any shape that's + broadcastable to the product of the lookup tables. + """ + to_shape = (len(row_lookup), len(col_lookup)) + item = self._broadcast_item(item, to_shape) + self._write_items(row_lookup, col_lookup, item) + + def _broadcast_item(self, item, to_shape): + """Use numpy to broadcast or reshape item. + + Notes: + - Numpy is memory efficent, there shouldn't be performance issue. + """ + try: + item = np.array(item) + if np.prod(to_shape) == np.prod(item.shape): + return item.reshape(to_shape) + else: + return np.broadcast_to(item, to_shape) + except ValueError: + from_shape = np.array(item).shape + raise ValueError( + "could not broadcast input array from \ + shape {from_shape} into shape {to_shape}".format( + from_shape=from_shape, to_shape=to_shape)) + + def _write_items(self, row_lookup, col_lookup, item): + """Perform remote write and replace blocks. + """ + + # We have to copy before we groupby because + # https://github.com/pandas-dev/pandas/issues/10043 + row_groups = row_lookup.copy().groupby('partition') + col_groups = col_lookup.copy().groupby('partition') + + row_item_index = 0 + for row_blk, row_data in row_groups: + row_len = len(row_data) + + col_item_index = 0 + for col_blk, col_data in col_groups: + col_len = len(col_data) + + block_oid = self.block_oids[row_blk, col_blk] + row_idx = row_data['index_within_partition'] + col_idx = col_data['index_within_partition'] + + item_to_write = item[row_item_index:row_item_index + row_len, + col_item_index:col_item_index + col_len] + + result_oid = writer.remote(block_oid, row_idx, col_idx, + item_to_write) + + if self.is_view: + self.df._block_partitions_data[row_blk, + col_blk] = result_oid + else: + self.df._block_partitions[row_blk, col_blk] = result_oid + + col_item_index += col_len + row_item_index += row_len class _Loc_Indexer(_Location_Indexer_Base): """A indexer for ray_df.loc[] functionality""" - def locate_2d(self, row_label, col_label): - index_loc = self.df._row_index.loc[row_label] - lookup_dict = self._get_lookup_dict(index_loc) - retrieved_rows_remote = self._map_partition( - lookup_dict, col_label, indexer='loc') - joined_df = pd.concat(ray.get(retrieved_rows_remote)) + def __getitem__(self, key): + row_loc, col_loc, ndim = _parse_tuple(key) + self._handle_enlargement(row_loc, col_loc) + row_lookup, col_lookup = self._compute_lookup(row_loc, col_loc) + ndim = self._expand_dim(row_lookup, col_lookup, ndim) + result = super(_Loc_Indexer, self).__getitem__(row_lookup, col_lookup, + ndim) + return result - if index_loc.ndim == 2: - # The returned result need to be indexed series/df - # Re-index is needed. - joined_df.index = index_loc.index + def __setitem__(self, key, item): + row_loc, col_loc, _ = _parse_tuple(key) + self._handle_enlargement(row_loc, col_loc) + row_lookup, col_lookup = self._compute_lookup(row_loc, col_loc) + super(_Loc_Indexer, self).__setitem__(row_lookup, col_lookup, + item) - if isinstance(row_label, int) or isinstance(row_label, str): - return joined_df.squeeze(axis=0) - else: - return joined_df + def _handle_enlargement(self, row_loc, col_loc): + """Handle Enlargement (if there is one). + + Returns: + None + """ + locators = [row_loc, col_loc] + coord_dfs = [self.row_coord_df, self.col_coord_df] + axis = ['row', 'col'] + metadata = {'row': self.df._row_metadata, 'col': self.df._col_metadata} + + for loc, coord, axis in zip(locators, coord_dfs, axis): + if _is_enlargement(loc, coord): + new_meta = self._enlarge_axis(loc, axis=axis) + _warn_enlargement() + metadata[axis] = new_meta + + self.row_coord_df = metadata['row']._coord_df + self.col_coord_df = metadata['col']._coord_df + + def _enlarge_axis(self, locator, axis): + """Add rows/columns to block partitions according to locator. + + Returns: + metadata (_IndexMetadata) + """ + # 1. Prepare variables + row_based_bool = axis == 'row' + # major == the axis of the locator + major_meta = self.df._row_metadata if row_based_bool \ + else self.df._col_metadata + minor_meta = self.df._col_metadata if row_based_bool \ + else self.df._row_metadata + + # 2. Compute the nan labels and add blocks + nan_labels = self._compute_enlarge_labels(locator, major_meta.index) + num_nan_labels = len(nan_labels) + blk_part_n_row, blk_part_n_col = self.block_oids.shape + + nan_blk_lens = minor_meta._lengths + nan_blks = np.array([[ + _get_nan_block_id( + num_nan_labels, n_cols, transpose=not row_based_bool) + for n_cols in nan_blk_lens + ]]) + nan_blks = nan_blks.T if not row_based_bool else nan_blks + + self.block_oids = np.concatenate( + [self.block_oids, nan_blks], axis=0 if row_based_bool else 1) + + # 3. Prepare metadata to return + nan_coord_df = pd.DataFrame(data=[{ + '': name, + 'partition': blk_part_n_row if row_based_bool else blk_part_n_col, + 'index_within_partition': i + } for name, i in zip(nan_labels, np.arange(num_nan_labels)) + ]).set_index('') + + coord_df = pd.concat([major_meta._coord_df, nan_coord_df]) + coord_df = coord_df.loc[locator] # Re-index that allows duplicates + + lens = major_meta._lengths + lens = np.concatenate([lens, np.array([num_nan_labels])]) + + metadata_view = _IndexMetadata(_coord_df=coord_df, _lengths=lens) + return metadata_view + + def _compute_enlarge_labels(self, locator, base_index): + """Helper for _enlarge_axis, compute common labels and extra labels. + + Returns: + nan_labels: The labels needs to be added + """ + locator_as_index = pd.Index(locator) + + nan_labels = locator_as_index.difference(base_index) + common_labels = locator_as_index.intersection(base_index) + + if len(common_labels) == 0: + raise KeyError( + 'None of [{labels}] are in the [{base_index_name}]'.format( + labels=list(locator_as_index), base_index_name=base_index)) + + return nan_labels + + def _expand_dim(self, row_lookup, col_lookup, ndim): + """Expand the dimension if necessary. + This method is for cases like duplicate labels. + """ + many_rows = len(row_lookup) > 1 + many_cols = len(col_lookup) > 1 + + if ndim == 0 and (many_rows or many_cols): + ndim = 1 + if ndim == 1 and (many_rows and many_cols): + ndim = 2 + + return ndim + + def _compute_lookup(self, row_loc, col_loc): + # We use reindex for list to avoid duplicates. + row_lookup = self.row_coord_df.loc[row_loc] + col_lookup = self.col_coord_df.loc[col_loc] + return row_lookup, col_lookup class _iLoc_Indexer(_Location_Indexer_Base): """A indexer for ray_df.iloc[] functionality""" - def locate_2d(self, row_idx, col_idx): - index_loc = self.df._row_index.iloc[row_idx] - lookup_dict = self._get_lookup_dict(index_loc) - retrieved_rows_remote = self._map_partition( - lookup_dict, col_idx, indexer='iloc') - joined_df = pd.concat(ray.get(retrieved_rows_remote)) + def __getitem__(self, key): + row_loc, col_loc, ndim = _parse_tuple(key) - if index_loc.ndim == 2: - # The returned result need to be indexed series/df - # Re-index is needed. - joined_df.index = index_loc.index + self._check_dtypes(row_loc) + self._check_dtypes(col_loc) - if isinstance(row_idx, int) or isinstance(row_idx, str): - return joined_df.squeeze(axis=0) - else: - return joined_df + row_lookup, col_lookup = self._compute_lookup(row_loc, col_loc) + result = super(_iLoc_Indexer, self).__getitem__( + row_lookup, col_lookup, ndim) + return result + + def __setitem__(self, key, item): + row_loc, col_loc, _ = _parse_tuple(key) + + self._check_dtypes(row_loc) + self._check_dtypes(col_loc) + + row_lookup, col_lookup = self._compute_lookup(row_loc, col_loc) + super(_iLoc_Indexer, self).__setitem__( + row_lookup, col_lookup, item) + + def _compute_lookup(self, row_loc, col_loc): + # We use reindex for list to avoid duplicates. + return self.row_coord_df.iloc[row_loc], self.col_coord_df.iloc[col_loc] + + def _check_dtypes(self, locator): + is_int = is_integer(locator) + is_int_slice = is_integer_slice(locator) + is_int_list = is_list_like(locator) and all(map(is_integer, locator)) + is_bool_arr = is_boolean_array(locator) + + if not any([is_int, is_int_slice, is_int_list, is_bool_arr]): + raise ValueError(_ILOC_INT_ONLY_ERROR) + + +class DataFrameView(DataFrame): + """A subclass of DataFrame where the index can be smaller than blocks. + """ + + def __init__(self, block_partitions, row_metadata, col_metadata, index, + columns): + self._block_partitions = block_partitions + self._row_metadata = row_metadata + self._col_metadata = col_metadata + self.index = index + self.columns = columns + + def _get_block_partitions(self): + oid_arr = _mask_block_partitions(self._block_partitions_data, + self._row_metadata, + self._col_metadata) + return oid_arr + + def _set_block_partitions(self, new_block_partitions): + self._block_partitions_data = new_block_partitions + + _block_partitions = property(_get_block_partitions, _set_block_partitions) diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index b9954e0e4..26a97c2af 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -9,6 +9,29 @@ import ray from . import get_npartitions +_NAN_BLOCKS = dict() + + +def _get_nan_block_id(n_row=1, n_col=1, transpose=False): + """A memory efficent way to get a block of NaNs. + + Args: + n_rows(int): number of rows + n_col(int): number of columns + transpose(bool): if true, swap rows and columns + Returns: + ObjectID of the NaN block + """ + global _NAN_BLOCKS + if transpose: + n_row, n_col = n_col, n_row + shape = (n_row, n_col) + if shape not in _NAN_BLOCKS: + arr = np.tile(np.array(np.NaN), shape) + _NAN_BLOCKS[shape] = ray.put(pd.DataFrame(data=arr)) + return _NAN_BLOCKS[shape] + + def _get_lengths(df): """Gets the length of the dataframe. Args: @@ -72,9 +95,11 @@ def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None): row_partitions.append(top) temp_df = temp_df[row_chunksize:] else: - if len(df) > row_chunksize: - temp_df.reset_index(drop=True, inplace=True) - temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) + # Handle the last chunk correctly. + # This call is necessary to prevent modifying original df + temp_df = temp_df[:] + temp_df.reset_index(drop=True, inplace=True) + temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) row_partitions.append(ray.put(temp_df)) return row_partitions @@ -113,6 +138,58 @@ def to_pandas(df): return pd_df +@ray.remote +def extractor(df_chunk, row_loc, col_loc): + """Retrieve an item from remote block + """ + # We currently have to do the writable flag trick because a pandas bug + # https://github.com/pandas-dev/pandas/issues/17192 + try: + row_loc.flags.writeable = True + col_loc.flags.writeable = True + except AttributeError: + # Locators might be scaler or python list + pass + return df_chunk.iloc[row_loc, col_loc] + + +@ray.remote +def writer(df_chunk, row_loc, col_loc, item): + """Make a copy of the block and write new item to it + """ + df_chunk = df_chunk.copy() + df_chunk.iloc[row_loc, col_loc] = item + return df_chunk + + +def _mask_block_partitions(blk_partitions, row_metadata, col_metadata): + """Return the squeezed/expanded block partitions as defined by + row_metadata and col_metadata. + + Note: + Very naive implementation. Extract one scaler at a time in a double + for loop. + """ + col_df = col_metadata._coord_df + row_df = row_metadata._coord_df + + result_oids = [] + shape = (len(row_df.index), len(col_df.index)) + + for _, row_partition_data in row_df.iterrows(): + for _, col_partition_data in col_df.iterrows(): + row_part = row_partition_data.partition + col_part = col_partition_data.partition + block_oid = blk_partitions[row_part, col_part] + + row_idx = row_partition_data['index_within_partition'] + col_idx = col_partition_data['index_within_partition'] + + result_oid = extractor.remote(block_oid, [row_idx], [col_idx]) + result_oids.append(result_oid) + return np.array(result_oids).reshape(shape) + + @ray.remote def _deploy_func(func, dataframe, *args): """Deploys a function for the _map_partitions call. @@ -227,8 +304,11 @@ def create_blocks_helper(df, npartitions, axis): @ray.remote def _blocks_to_col(*partition): - return pd.concat(partition, axis=0, copy=False)\ - .reset_index(drop=True) + if len(partition): + return pd.concat(partition, axis=0, copy=False)\ + .reset_index(drop=True) + else: + return pd.Series() @ray.remote