diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 317d71de7..f333b0347 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -15,9 +15,7 @@ from pandas.core.dtypes.common import ( is_list_like, is_numeric_dtype, is_timedelta64_dtype) -from pandas.core.indexing import ( - check_bool_indexer, - convert_to_index_sliceable) +from pandas.core.indexing import check_bool_indexer import warnings import numpy as np @@ -29,19 +27,18 @@ from .utils import ( _map_partitions, _partition_pandas_dataframe, to_pandas, - _build_index, _blocks_to_col, _blocks_to_row, - _build_columns, _create_block_partitions) from . import get_npartitions +from .index_metadata import _IndexMetadata class DataFrame(object): def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False, col_partitions=None, row_partitions=None, - block_partitions=None): + block_partitions=None, row_metadata=None, col_metadata=None): """Distributed DataFrame object backed by Pandas dataframes. Args: @@ -60,6 +57,10 @@ class DataFrame(object): row_partitions ([ObjectID]): The list of ObjectIDs that contain the row dataframe partitions. block_partitions: A 2D numpy array of block partitions. + row_metadata (_IndexMetadata): + Metadata for the new dataframe's rows + col_metadata (_IndexMetadata): + Metadata for the new dataframe's columns """ # Check type of data and use appropriate constructor if data is not None or (col_partitions is None and @@ -89,18 +90,27 @@ class DataFrame(object): "Columns not defined, must define columns for internal " \ "DataFrame creations" + self._row_metadata = self._col_metadata = None if block_partitions is not None: # put in numpy array here to make accesses easier since it's 2D self._block_partitions = np.array(block_partitions) + if row_metadata is not None: + self._row_metadata = row_metadata.copy() + if col_metadata is not None: + self._col_metadata = col_metadata.copy() assert self._block_partitions.ndim == 2, \ "Block Partitions must be 2D." else: if row_partitions is not None: axis = 0 partitions = row_partitions + if row_metadata is not None: + self._row_metadata = row_metadata.copy() elif col_partitions is not None: axis = 1 partitions = col_partitions + if col_metadata is not None: + self._col_metadata = col_metadata.copy() self._block_partitions = \ _create_block_partitions(partitions, axis=axis, @@ -114,10 +124,13 @@ class DataFrame(object): axis=axis ^ 1) # Create the row and column index objects for using our partitioning. - self._row_lengths, self._row_index = \ - _build_index.remote(self._block_partitions[:, 0], index) - self._col_lengths, self._col_index = \ - _build_columns.remote(self._block_partitions[0, :], columns) + # If the objects haven't been inherited, then generate them + if not self._row_metadata: + self._row_metadata = _IndexMetadata(self._block_partitions[:, 0], + index=index, axis=0) + if not self._col_metadata: + self._col_metadata = _IndexMetadata(self._block_partitions[0, :], + index=columns, axis=1) def _get_row_partitions(self): return [_blocks_to_row.remote(*part) @@ -145,7 +158,7 @@ class DataFrame(object): return repr(self) def __repr__(self): - if sum(self._row_lengths) < 60: + if len(self._row_metadata) < 60: result = repr(to_pandas(self)) return result @@ -154,7 +167,7 @@ class DataFrame(object): new_dfs = _map_partitions(lambda df: df.head(n), df) - index = self._row_index.head(n).index + index = self.index[:n] pd_head = pd.concat(ray.get(new_dfs), axis=1, copy=False) pd_head.index = index pd_head.columns = self.columns @@ -166,7 +179,7 @@ class DataFrame(object): new_dfs = _map_partitions(lambda df: df.tail(n), df) - index = self._row_index.tail(n).index + index = self.index[-n:] pd_tail = pd.concat(ray.get(new_dfs), axis=1, copy=False) pd_tail.index = index pd_tail.columns = self.columns @@ -198,10 +211,7 @@ class DataFrame(object): Returns: The union of all indexes across the partitions. """ - if isinstance(self._row_index, pd.core.indexes.range.RangeIndex) or \ - isinstance(self._row_index, pd.core.indexes.base.Index): - return self._row_index - return self._row_index.index + return self._row_metadata.index def _set_index(self, new_index): """Set the index for this DataFrame. @@ -209,47 +219,17 @@ class DataFrame(object): Args: new_index: The new index to set this """ - if isinstance(self._row_index, pd.core.indexes.range.RangeIndex) or \ - isinstance(self._row_index, pd.core.indexes.base.Index): - self._row_index = new_index - else: - self._row_index.index = new_index + self._row_metadata.index = new_index index = property(_get_index, _set_index) - def _get__row_index(self): - """Get the _row_index for this DataFrame. - - Returns: - The default index. - """ - if self._row_index_cache is None: - return None - - if isinstance(self._row_index_cache, ray.local_scheduler.ObjectID): - self._row_index_cache = ray.get(self._row_index_cache) - return self._row_index_cache - - def _set__row_index(self, new__index): - """Set the _row_index for this DataFrame. - - Args: - new__index: The new default index to set. - """ - self._row_index_cache = new__index - - _row_index = property(_get__row_index, _set__row_index) - def _get_columns(self): """Get the columns for this DataFrame. Returns: The union of all indexes across the partitions. """ - if isinstance(self._col_index, pd.core.indexes.range.RangeIndex) or \ - isinstance(self._col_index, pd.core.indexes.base.Index): - return self._col_index - return self._col_index.index + return self._col_metadata.index def _set_columns(self, new_index): """Set the columns for this DataFrame. @@ -257,101 +237,16 @@ class DataFrame(object): Args: new_index: The new index to set this """ - if isinstance(self._col_index, pd.core.indexes.range.RangeIndex) or \ - isinstance(self._col_index, pd.core.indexes.base.Index): - self._col_index = new_index - return - self._col_index.index = new_index + self._col_metadata.index = new_index columns = property(_get_columns, _set_columns) - def _get__col_index(self): - """Get the _col_index for this DataFrame. - - Returns: - The default index. - """ - if self._col_index_cache is None: - return None - - if isinstance(self._col_index_cache, ray.local_scheduler.ObjectID): - self._col_index_cache = ray.get(self._col_index_cache) - return self._col_index_cache - - def _set__col_index(self, new__index): - """Set the _col_index for this DataFrame. - - Args: - new__index: The new default index to set. - """ - self._col_index_cache = new__index - - _col_index = property(_get__col_index, _set__col_index) - - def _get_row_lengths(self): - """Gets the lengths for each partition and caches it if it wasn't. - - Returns: - A list of integers representing the length of each partition. - """ - if self._row_length_cache is None: - return None - if isinstance(self._row_length_cache, ray.local_scheduler.ObjectID): - self._row_length_cache = ray.get(self._row_length_cache) - elif isinstance(self._row_length_cache, list) and \ - isinstance(self._row_length_cache[0], - ray.local_scheduler.ObjectID): - self._row_length_cache = ray.get(self._row_length_cache) - return self._row_length_cache - - def _set_row_lengths(self, lengths): - """Sets the lengths of each partition for this DataFrame. - - We use this because we can compute it when creating the DataFrame. - - Args: - lengths ([ObjectID or Int]): A list of lengths for each - partition, in order. - """ - self._row_length_cache = lengths - - _row_lengths = property(_get_row_lengths, _set_row_lengths) - - def _get_col_lengths(self): - """Gets the lengths for each partition and caches it if it wasn't. - - Returns: - A list of integers representing the length of each partition. - """ - if self._col_length_cache is None: - return None - if isinstance(self._col_length_cache, ray.local_scheduler.ObjectID): - self._col_length_cache = ray.get(self._col_length_cache) - elif isinstance(self._col_length_cache, list) and \ - isinstance(self._col_length_cache[0], - ray.local_scheduler.ObjectID): - self._col_length_cache = ray.get(self._col_length_cache) - return self._col_length_cache - - def _set_col_lengths(self, lengths): - """Sets the lengths of each partition for this DataFrame. - - We use this because we can compute it when creating the DataFrame. - - Args: - lengths ([ObjectID or Int]): A list of lengths for each - partition, in order. - """ - self._col_length_cache = lengths - - _col_lengths = property(_get_col_lengths, _set_col_lengths) - def _arithmetic_helper(self, remote_func, axis, level=None): # TODO: We don't support `level` right now if level is not None: raise NotImplementedError("Level not yet supported.") - axis = self._row_index._get_axis_number(axis) if axis is not None \ + axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ else 0 oid_series = ray.get(_map_partitions(remote_func, @@ -362,12 +257,9 @@ class DataFrame(object): # We use the index to get the internal index. oid_series = [(oid_series[i], i) for i in range(len(oid_series))] - for df, index in oid_series: - this_partition = \ - self._col_index[self._col_index['partition'] == index] - df.index = this_partition[ - this_partition['index_within_partition'].isin(df.index) - ].index + for df, partition in oid_series: + this_partition = self._col_metadata.partition_series(partition) + df.index = this_partition[this_partition.isin(df.index)].index result_series = pd.concat([obj[0] for obj in oid_series], axis=0, copy=False) @@ -502,10 +394,10 @@ class DataFrame(object): if row_partitions is not None or col_partitions is not None: # At least one partition list is being updated, so recompute # lengths and indices - self._row_lengths, self._row_index = \ - _build_index.remote(self._block_partitions[:, 0], index) - self._col_lengths, self._col_index = \ - _build_columns.remote(self._block_partitions[0, :], columns) + self._row_metadata = _IndexMetadata(self._block_partitions[:, 0], + index=index, axis=0) + self._col_metadata = _IndexMetadata(self._block_partitions[0, :], + index=columns, axis=1) def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -920,7 +812,7 @@ class DataFrame(object): "github.com/ray-project/ray.") def _cumulative_helper(self, func, axis): - axis = self._row_index._get_axis_number(axis) if axis is not None \ + axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ else 0 if axis == 0: @@ -1024,12 +916,9 @@ class DataFrame(object): # We use the index to get the internal index. parts = [(parts[i], i) for i in range(len(parts))] - for df, index in parts: - this_partition = \ - self._col_index[self._col_index['partition'] == index] - df.columns = this_partition[ - this_partition['index_within_partition'].isin(df.columns) - ].index + for df, partition in parts: + this_partition = self._col_metadata.partition_series(partition) + df.columns = this_partition[this_partition.isin(df.columns)].index # Remove index from tuple result = pd.concat([obj[0] for obj in parts], axis=1, copy=False) @@ -1085,21 +974,24 @@ class DataFrame(object): if index is not None or columns is not None: raise ValueError("Cannot specify both 'labels' and " "'index'/'columns'") - axis = self._row_index._get_axis_name(axis) + axis = pd.DataFrame()._get_axis_name(axis) axes = {axis: labels} elif index is not None or columns is not None: - axes, _ = self._row_index._construct_axes_from_arguments((index, - columns), - {}) + axes, _ = pd.DataFrame()._construct_axes_from_arguments((index, + columns), + {}) else: raise ValueError("Need to specify at least one of 'labels', " "'index' or 'columns'") obj = self.copy() def drop_helper(obj, axis, label): + # TODO(patyang): If you drop from the index first, you can do it + # in batch by returning the dropped items. Likewise coords.drop + # leaves the coords df in an inconsistent state. if axis == 'index': try: - coords = obj._row_index.loc[label] + coords = obj._row_metadata[label] if isinstance(coords, pd.DataFrame): partitions = list(coords['partition']) indexes = list(coords['index_within_partition']) @@ -1121,18 +1013,14 @@ class DataFrame(object): # The decrement here is because we're dropping one at a # time and the index is automatically updated when we # convert back to blocks. - obj._row_index = obj._row_index.copy() - obj._row_index.loc[ - (obj._row_index.partition == part) & - (obj._row_index.index_within_partition > index), - 'index_within_partition'] -= 1 + obj._row_metadata.squeeze(part, index) - obj._row_index.drop(labels=label, axis=0, inplace=True) + obj._row_metadata.drop(labels=label) except KeyError: return obj else: try: - coords = obj._col_index.loc[label] + coords = obj._col_metadata[label] if isinstance(coords, pd.DataFrame): partitions = list(coords['partition']) indexes = list(coords['index_within_partition']) @@ -1154,13 +1042,9 @@ class DataFrame(object): # The decrement here is because we're dropping one at a # time and the index is automatically updated when we # convert back to blocks. - obj._col_index = obj._col_index.copy() - obj._col_index.loc[ - (obj._col_index.partition == part) & - (obj._col_index.index_within_partition > index), - 'index_within_partition'] -= 1 + obj._col_metadata.squeeze(part, index) - obj._col_index.drop(labels=label, axis=0, inplace=True) + obj._col_metadata.drop(labels=label) except KeyError: return obj @@ -1189,8 +1073,8 @@ class DataFrame(object): if not inplace: return obj else: - self._row_index = obj._row_index - self._col_index = obj._col_index + self._row_metadata = obj._row_metadata + self._col_metadata = obj._col_metadata self._block_partitions = obj._block_partitions def drop_duplicates(self, subset=None, keep='first', inplace=False): @@ -1223,14 +1107,15 @@ class DataFrame(object): results = [] other_partition = None other_df = None - for i, idx in other._row_index.iterrows(): + # TODO: Make the appropriate coord df accessor methods for this fxn + for i, idx in other._row_metadata._coord_df.iterrows(): if idx['partition'] != other_partition: other_df = ray.get(other._row_partitions[idx['partition']]) other_partition = idx['partition'] # TODO: group series here into full df partitions to reduce # the number of remote calls to helper other_series = other_df.iloc[idx['index_within_partition']] - curr_index = self._row_index.iloc[i] + curr_index = self._row_metadata._coord_df.iloc[i] curr_df = self._row_partitions[int(curr_index['partition'])] results.append(_deploy_func.remote(helper, curr_df, @@ -1299,7 +1184,8 @@ class DataFrame(object): inplace = validate_bool_kwarg(inplace, "inplace") new_rows = _map_partitions(eval_helper, self._row_partitions) - columns_copy = self._col_index.T.copy() + # TODO: This doesn't work if the expression is not an assignment + columns_copy = self._col_metadata._coord_df.T.copy() columns_copy.eval(expr, inplace=True, **kwargs) columns = columns_copy.columns @@ -1369,7 +1255,7 @@ class DataFrame(object): inplace = validate_bool_kwarg(inplace, 'inplace') - axis = self._row_index._get_axis_number(axis) \ + axis = pd.DataFrame()._get_axis_number(axis) \ if axis is not None \ else 0 @@ -1392,12 +1278,10 @@ class DataFrame(object): else: new_obj = self.copy() - if axis == 0: - parts = new_obj._col_partitions - idx_obj = new_obj._col_index - else: - parts = new_obj._row_partitions - idx_obj = new_obj._row_index + parts, coords_obj = (new_obj._col_partitions, + new_obj._col_metadata) if axis == 0 else \ + (new_obj._row_partitions, + new_obj._row_metadata) if isinstance(value, (pd.Series, dict)): new_vals = {} @@ -1405,7 +1289,7 @@ class DataFrame(object): for val in value: # Get the local index for the partition try: - part, index = idx_obj.loc[val] + part, index = coords_obj[val] # Pandas ignores these errors so we will suppress them too. except KeyError: continue @@ -1421,8 +1305,8 @@ class DataFrame(object): # Not every partition was changed, so we put everything back that # was not changed and update those that were. - new_parts = [parts[i] if idx_obj.index[i] not in new_vals - else new_vals[idx_obj.index[i]] + new_parts = [parts[i] if coords_obj.index[i] not in new_vals + else new_vals[coords_obj.index[i]] for i in range(len(parts))] else: new_parts = _map_partitions(lambda df: df.fillna( @@ -1461,9 +1345,7 @@ class DataFrame(object): Returns: scalar: type of index """ - if self._row_index is not None: - return self._row_index.first_valid_index() - return None + return self._row_metadata.first_valid_index() def floordiv(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError( @@ -1561,15 +1443,13 @@ class DataFrame(object): Returns: A new dataframe with the first n rows of the dataframe. """ - sizes = self._row_lengths - - if n >= sum(sizes): + if n >= len(self._row_metadata): return self.copy() new_dfs = _map_partitions(lambda df: df.head(n), self._col_partitions) - index = self._row_index.head(n).index + index = self._row_metadata.index[:n] return DataFrame(col_partitions=new_dfs, columns=self.columns, @@ -1662,27 +1542,25 @@ class DataFrame(object): if loc < 0: raise ValueError("unbounded slice") - # Perform insert on a specific column partition - # Determine which column partition to place it in, and where in that - # partition - col_cum_lens = np.cumsum(self._col_lengths) - col_part_idx = np.digitize(loc, col_cum_lens[:-1]) - col_part_loc = loc - np.asscalar( - np.concatenate(([0], col_cum_lens))[col_part_idx]) + partition, index_within_partition = \ + self._col_metadata.insert(column, loc) # Deploy insert function to specific column partition, and replace that # column def insert_col_part(df): - df.insert(col_part_loc, column, value, allow_duplicates) + df.insert(index_within_partition, column, value, allow_duplicates) return df + print('partition:', partition) + print('i_w_partition', index_within_partition) + print('df:\n', ray.get(self._col_partitions[partition])) new_obj = _deploy_func.remote(insert_col_part, - self._col_partitions[col_part_idx]) + self._col_partitions[partition]) new_cols = [self._col_partitions[i] - if i != col_part_idx + if i != partition else new_obj for i in range(len(self._col_partitions))] - new_col_names = self._col_index.index.insert(loc, column) + new_col_names = self.columns.insert(loc, column) self._update_inplace(col_partitions=new_cols, columns=new_col_names) @@ -1816,9 +1694,7 @@ class DataFrame(object): Returns: scalar: type of index """ - if self._row_index is not None: - return self._row_index.last_valid_index() - return None + return self._row_metadata.last_valid_index() def le(self, other, axis='columns', level=None): raise NotImplementedError( @@ -2236,8 +2112,7 @@ class DataFrame(object): if axes_is_columns: renamed.columns.name = mapper else: - renamed._row_index.rename_axis(mapper, axis=axis, copy=copy, - inplace=True) + renamed.index.name = mapper if not inplace: return renamed @@ -2257,7 +2132,7 @@ class DataFrame(object): if axes_is_columns: renamed.columns.set_names(name) else: - renamed._row_index.set_names(name) + renamed.index.set_names(name) if not inplace: return renamed @@ -2333,10 +2208,7 @@ class DataFrame(object): return values # We're building a new default index dataframe for use later. - _, new_index = \ - _build_index.remote(new_obj._block_partitions[:, 0], None) - - new_index = ray.get(new_index).index + new_index = pd.RangeIndex(len(self)) if level is not None: if not isinstance(level, (tuple, list)): level = [level] @@ -2485,7 +2357,7 @@ class DataFrame(object): FutureWarning, stacklevel=2) inplace = True if inplace: - setattr(self, self._row_index._get_axis_name(axis), labels) + setattr(self, pd.DataFrame()._get_axis_name(axis), labels) else: obj = self.copy() obj.set_axis(labels, axis=axis, inplace=True) @@ -2666,15 +2538,13 @@ class DataFrame(object): Returns: A new dataframe with the last n rows of this dataframe. """ - sizes = self._row_lengths - - if n >= sum(sizes): + if n >= len(self._row_metadata): return self new_dfs = _map_partitions(lambda df: df.tail(n), self._col_partitions) - index = self._row_index.tail(n).index + index = self._row_metadata.index[-n:] return DataFrame(col_partitions=new_dfs, columns=self.columns, index=index) @@ -2921,7 +2791,7 @@ class DataFrame(object): pass # see if we can slice the rows - indexer = convert_to_index_sliceable(self._row_index, key) + indexer = self._row_metadata.convert_to_index_sliceable(key) if indexer is not None: raise NotImplementedError("To contribute to Pandas on Ray, please" "visit github.com/ray-project/ray.") @@ -2941,7 +2811,8 @@ class DataFrame(object): return self._getitem_column(key) def _getitem_column(self, key): - partition = self._col_index.loc[key].loc['partition'] + # may result in multiple columns? + partition = self._col_metadata[key, 'partition'] result = ray.get(self._getitem_indiv_col(key, partition)) result.name = key result.index = self.index @@ -2983,7 +2854,7 @@ class DataFrame(object): index=index) def _getitem_indiv_col(self, key, part): - loc = self._col_index.loc[key] + loc = self._col_metadata[key] if isinstance(loc, pd.Series): index = loc[loc['partition'] == part] else: @@ -3019,7 +2890,7 @@ class DataFrame(object): Returns: Returns an integer length of the dataframe object. """ - return sum(self._row_lengths) + return len(self._row_metadata) def __unicode__(self): raise NotImplementedError( @@ -3126,7 +2997,7 @@ class DataFrame(object): del_helper, self._row_partitions, to_delete) # This structure is used to get the correct index inside the partition. - del_df = self._col_index.loc[key] + del_df = self._col_metadata[key] # We need to standardize between multiple and single occurrences in the # columns. Putting single occurrences in a pd.DataFrame and transposing @@ -3136,9 +3007,9 @@ class DataFrame(object): # Cast cols as pd.Series as duplicate columns mean result may be # np.int64 or pd.Series - col_parts_to_del = pd.Series( - self._col_index.loc[key, 'partition']).unique() - self._col_index.drop(key, inplace=True) + col_parts_to_del = \ + pd.Series(self._col_metadata[key, 'partition']).unique() + self._col_metadata.drop(key) for i in col_parts_to_del: # Compute the correct index inside the partition to delete. to_delete_in_partition = \ @@ -3147,22 +3018,7 @@ class DataFrame(object): self._col_partitions[i] = _deploy_func.remote( del_helper, self._col_partitions[i], to_delete_in_partition) - partition_mask = (self._col_index['partition'] == i) - - # Since we are replacing columns with RangeIndex inside the - # partition, we have to make sure that our reference to it is - # updated as well. - try: - self._col_index.loc[partition_mask, - 'index_within_partition'] = [ - p for p in range(sum(partition_mask))] - except ValueError: - # Copy the arrow sealed dataframe so we can mutate it. - # We only do this the first time we try to mutate the sealed. - self._col_index = self._col_index.copy() - self._col_index.loc[partition_mask, - 'index_within_partition'] = [ - p for p in range(sum(partition_mask))] + self._col_metadata.reset_partition_coords(col_parts_to_del) def __finalize__(self, other, method=None, **kwargs): raise NotImplementedError( diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py new file mode 100644 index 000000000..8c492e02d --- /dev/null +++ b/python/ray/dataframe/index_metadata.py @@ -0,0 +1,341 @@ +import pandas as pd +import numpy as np +import ray + +from .utils import ( + _build_index, + _build_columns) + +from pandas.core.indexing import convert_to_index_sliceable + + +class _IndexMetadataBase(object): + """Wrapper for Pandas indexes in Ray DataFrames. Handles all of the + metadata specific to the axis of partition (setting indexes, + calculating the index within partition of a value, etc.) since the + dataframe may be partitioned across either axis. This way we can unify the + possible index operations over one axis-agnostic interface. + + This class is the abstract superclass for IndexMetadata and + WrappingIndexMetadata, which handle indexes along the partitioned and + non-partitioned axes, respectively. + + IMPORTANT NOTE: Currently all operations, as implemented, are inplace. + """ + + def _get__coord_df(self): + if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + self._coord_df_cache = ray.get(self._coord_df_cache) + return self._coord_df_cache + + def _set__coord_df(self, coord_df): + self._coord_df_cache = coord_df + + _coord_df = property(_get__coord_df, _set__coord_df) + + def _get_index(self): + """Get the index wrapped by this IndexDF. + + Returns: + The index wrapped by this IndexDF + """ + return self._coord_df.index + + def _set_index(self, new_index): + """Set the index wrapped by this IndexDF. + + Args: + new_index: The new index to wrap + """ + self._coord_df.index = new_index + + index = property(_get_index, _set_index) + + def coords_of(self, key): + raise NotImplementedError() + + def __getitem__(self, key): + return self.coords_of(key) + + def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, + group_keys=True, squeeze=False, **kwargs): + raise NotImplementedError() + + def __len__(self): + return len(self._coord_df) + + def first_valid_index(self): + return self._coord_df.first_valid_index() + + def last_valid_index(self): + return self._coord_df.last_valid_index() + + def insert(self, key, loc=None, partition=None, + index_within_partition=None): + raise NotImplementedError() + + def drop(self, labels, errors='raise'): + """Drop the specified labels from the IndexMetadata + + Args: + labels (scalar or list-like): + The labels to drop + errors ('raise' or 'ignore'): + If 'ignore', suppress errors for when labels don't exist + + Returns: + DataFrame with coordinates of dropped labels + """ + # TODO(patyang): This produces inconsistent indexes. + dropped = self.coords_of(labels) + self._coord_df = self._coord_df.drop(labels, errors=errors) + return dropped + + def rename_index(self, mapper): + """Rename the index. + + Args: + mapper: name to rename the index as + """ + self._coord_df = self._coord_df.rename_axis(mapper, axis=0) + + def convert_to_index_sliceable(self, key): + """Converts and performs error checking on the passed slice + + Args: + key: slice to convert and check + """ + return convert_to_index_sliceable(self._coord_df, key) + + +class _IndexMetadata(_IndexMetadataBase): + """IndexMetadata implementation for index across a partitioned axis. This + implementation assumes the underlying index lies across multiple + partitions. + """ + + def __init__(self, dfs, index=None, axis=0): + """Inits a IndexMetadata from Ray DataFrame partitions + + Args: + dfs ([ObjectID]): ObjectIDs of dataframe partitions + index (pd.Index): Index of the Ray DataFrame. + axis: Axis of partition (0=row partitions, 1=column partitions) + + Returns: + A IndexMetadata backed by the specified pd.Index, partitioned off + specified partitions + """ + lengths_oid, coord_df_oid = \ + _build_index.remote(dfs, index) if axis == 0 else \ + _build_columns.remote(dfs, index) + self._coord_df = coord_df_oid + self._lengths = lengths_oid + + def _get__lengths(self): + if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \ + (isinstance(self._lengths_cache, list) and + isinstance(self._lengths_cache[0], ray.local_scheduler.ObjectID)): + self._lengths_cache = ray.get(self._lengths_cache) + return self._lengths_cache + + def _set__lengths(self, lengths): + self._lengths_cache = lengths + + _lengths = property(_get__lengths, _set__lengths) + + def coords_of(self, key): + """Returns the coordinates (partition, index_within_partition) of the + provided key in the index. Can be called on its own or implicitly + through __getitem__ + + Args: + key: + item to get coordinates of. Can also be a tuple of item + and {partition, index_within_partition} if caller only + needs one of the coordinates + + Returns: + Pandas object with the keys specified. If key is a single object + it will be a pd.Series with items `partition` and + `index_within_partition`, and if key is a slice or if the key is + duplicate it will be a pd.DataFrame with said items as columns. + """ + return self._coord_df.loc[key] + + def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, + group_keys=True, squeeze=False, **kwargs): + # TODO: Find out what this does, and write a docstring + assignments_df = self._coord_df.groupby(by=by, axis=axis, level=level, + as_index=as_index, sort=sort, + group_keys=group_keys, + squeeze=squeeze, **kwargs)\ + .apply(lambda x: x[:]) + return assignments_df + + def partition_series(self, partition): + return self[self._coord_df['partition'] == partition, + 'index_within_partition'] + + def __len__(self): + # Hard to say if this is faster than IndexMetadataBase.__len__ if + # self._coord_df is non-resident + return sum(self._lengths) + + def reset_partition_coords(self, partitions=None): + partitions = np.array(partitions) + + for partition in partitions: + partition_mask = (self._coord_df['partition'] == partition) + # Since we are replacing columns with RangeIndex inside the + # partition, we have to make sure that our reference to it is + # updated as well. + try: + self._coord_df.loc[partition_mask, + 'index_within_partition'] = [ + p for p in range(sum(partition_mask))] + except ValueError: + # Copy the arrow sealed dataframe so we can mutate it. + # We only do this the first time we try to mutate the sealed. + self._coord_df = self._coord_df.copy() + self._coord_df.loc[partition_mask, + 'index_within_partition'] = [ + p for p in range(sum(partition_mask))] + + def insert(self, key, loc=None, partition=None, + index_within_partition=None): + """Inserts a key at a certain location in the index, or a certain coord + in a partition. Called with either `loc` or `partition` and + `index_within_partition`. If called with both, `loc` will be used. + + Args: + key: item to insert into index + loc: location to insert into index + partition: partition to insert into + index_within_partition: index within partition to insert into + + Returns: + DataFrame with coordinates of insert + """ + # Perform insert on a specific partition + # Determine which partition to place it in, and where in that partition + if loc is not None: + cum_lens = np.cumsum(self._lengths) + partition = np.digitize(loc, cum_lens[:-1]) + if partition >= len(cum_lens): + if loc > cum_lens[-1]: + raise IndexError("index {0} is out of bounds".format(loc)) + else: + index_within_partition = self._lengths[-1] + else: + first_in_partition = \ + np.asscalar(np.concatenate(([0], cum_lens))[partition]) + index_within_partition = loc - first_in_partition + + # TODO: Stop-gap solution until we begin passing IndexMetadatas + return partition, index_within_partition + + # Generate new index + new_index = self.index.insert(loc, key) + + # Shift indices in partition where we inserted column + idx_locs = (self._coord_df.partition == partition) & \ + (self._coord_df.index_within_partition == + index_within_partition) + # TODO: Determine why self._coord_df{,_cache} are read-only + _coord_df_copy = self._coord_df.copy() + _coord_df_copy.loc[idx_locs, 'index_within_partition'] += 1 + + # TODO: Determine if there's a better way to do a row-index insert in + # pandas, because this is very annoying/unsure of efficiency + # Create new coord entry to insert + coord_to_insert = pd.DataFrame( + {'partition': partition, + 'index_within_partition': index_within_partition}, + index=[key]) + + # Insert into cached RangeIndex, and order by new column index + self._coord_df = _coord_df_copy.append(coord_to_insert).loc[new_index] + + # Return inserted coordinate for callee + return coord_to_insert + + def squeeze(self, partition, index_within_partition): + self._coord_df = self._coord_df.copy() + + partition_mask = self._coord_df.partition == partition + index_within_partition_mask = \ + self._coord_df.index_within_partition > index_within_partition + self._coord_df.loc[partition_mask & index_within_partition_mask, + 'index_within_partition'] -= 1 + + +class _WrappingIndexMetadata(_IndexMetadata): + """IndexMetadata implementation for index across a non-partitioned axis. + This implementation assumes the underlying index lies across one partition. + """ + + def __init__(self, index): + """Inits a IndexMetadata from Pandas Index only. + + Args: + index (pd.Index): Index to wrap. + + Returns: + A IndexMetadata backed by the specified pd.Index. + """ + self._coord_df = pd.DataFrame(index=index) + # Set _lengths as a dummy variable for future-proof method inheritance + self._lengths = [len(index)] + + def coords_of(self, key): + """Returns the coordinates (partition, index_within_partition) of the + provided key in the index + + Args: + key: item to get coordinates of + + Returns: + Pandas object with the keys specified. If key is a single object + it will be a pd.Series with items `partition` and + `index_within_partition`, and if key is a slice it will be a + pd.DataFrame with said items as columns. + """ + locs = self.index.get_loc(key) + # locs may be a single int, a slice, or a boolean mask. + # Convert here to iterable of integers + loc_idxs = pd.RangeIndex(len(self.index))[locs] + # TODO: Investigate "modify view/copy" warning + ret_obj = self._coord_df.loc[key] + ret_obj['partition'] = 0 + ret_obj['index_within_partition'] = loc_idxs + return ret_obj + + def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, + group_keys=True, squeeze=False, **kwargs): + raise NotImplementedError() + + def insert(self, key, loc=None, partition=None, + index_within_partition=None): + """Inserts a key at a certain location in the index, or a certain coord + in a partition. Called with either `loc` or `partition` and + `index_within_partition`. If called with both, `loc` will be used. + + Args: + key: item to insert into index + loc: location to insert into index + partition: partition to insert into + index_within_partition: index within partition to insert into + + Returns: + DataFrame with coordinates of insert + """ + # Generate new index + new_index = self.index.insert(loc, key) + + # Make new empty coord_df + self._coord_df = pd.DataFrame(index=new_index) + + # Shouldn't really need this, but here to maintain API consistency + return pd.DataFrame({'partition': 0, 'index_within_partition': loc}, + index=[key])