[DataFrame] Encapsulate index and lengths into separate class (#1849)

* baseline impl for index_df.py

* added skeleton for index_df.py

* initial impl index_df

* separate out partition and non-partition impls

* add len function

* drop returns index_df slice of dropped indices

* housecleaning

* Integrate index overhaul

* Rename index df to index metadata

* Fix flake8 issues

* Addressing issues

* fix import issue

* Added metadata passing to constructor
This commit is contained in:
Patrick Yang
2018-04-10 14:30:20 -07:00
committed by Devin Petersohn
parent 405b05d58a
commit 521b549e4a
2 changed files with 440 additions and 243 deletions
+99 -243
View File
@@ -15,9 +15,7 @@ from pandas.core.dtypes.common import (
is_list_like,
is_numeric_dtype,
is_timedelta64_dtype)
from pandas.core.indexing import (
check_bool_indexer,
convert_to_index_sliceable)
from pandas.core.indexing import check_bool_indexer
import warnings
import numpy as np
@@ -29,19 +27,18 @@ from .utils import (
_map_partitions,
_partition_pandas_dataframe,
to_pandas,
_build_index,
_blocks_to_col,
_blocks_to_row,
_build_columns,
_create_block_partitions)
from . import get_npartitions
from .index_metadata import _IndexMetadata
class DataFrame(object):
def __init__(self, data=None, index=None, columns=None, dtype=None,
copy=False, col_partitions=None, row_partitions=None,
block_partitions=None):
block_partitions=None, row_metadata=None, col_metadata=None):
"""Distributed DataFrame object backed by Pandas dataframes.
Args:
@@ -60,6 +57,10 @@ class DataFrame(object):
row_partitions ([ObjectID]): The list of ObjectIDs that contain the
row dataframe partitions.
block_partitions: A 2D numpy array of block partitions.
row_metadata (_IndexMetadata):
Metadata for the new dataframe's rows
col_metadata (_IndexMetadata):
Metadata for the new dataframe's columns
"""
# Check type of data and use appropriate constructor
if data is not None or (col_partitions is None and
@@ -89,18 +90,27 @@ class DataFrame(object):
"Columns not defined, must define columns for internal " \
"DataFrame creations"
self._row_metadata = self._col_metadata = None
if block_partitions is not None:
# put in numpy array here to make accesses easier since it's 2D
self._block_partitions = np.array(block_partitions)
if row_metadata is not None:
self._row_metadata = row_metadata.copy()
if col_metadata is not None:
self._col_metadata = col_metadata.copy()
assert self._block_partitions.ndim == 2, \
"Block Partitions must be 2D."
else:
if row_partitions is not None:
axis = 0
partitions = row_partitions
if row_metadata is not None:
self._row_metadata = row_metadata.copy()
elif col_partitions is not None:
axis = 1
partitions = col_partitions
if col_metadata is not None:
self._col_metadata = col_metadata.copy()
self._block_partitions = \
_create_block_partitions(partitions, axis=axis,
@@ -114,10 +124,13 @@ class DataFrame(object):
axis=axis ^ 1)
# Create the row and column index objects for using our partitioning.
self._row_lengths, self._row_index = \
_build_index.remote(self._block_partitions[:, 0], index)
self._col_lengths, self._col_index = \
_build_columns.remote(self._block_partitions[0, :], columns)
# If the objects haven't been inherited, then generate them
if not self._row_metadata:
self._row_metadata = _IndexMetadata(self._block_partitions[:, 0],
index=index, axis=0)
if not self._col_metadata:
self._col_metadata = _IndexMetadata(self._block_partitions[0, :],
index=columns, axis=1)
def _get_row_partitions(self):
return [_blocks_to_row.remote(*part)
@@ -145,7 +158,7 @@ class DataFrame(object):
return repr(self)
def __repr__(self):
if sum(self._row_lengths) < 60:
if len(self._row_metadata) < 60:
result = repr(to_pandas(self))
return result
@@ -154,7 +167,7 @@ class DataFrame(object):
new_dfs = _map_partitions(lambda df: df.head(n),
df)
index = self._row_index.head(n).index
index = self.index[:n]
pd_head = pd.concat(ray.get(new_dfs), axis=1, copy=False)
pd_head.index = index
pd_head.columns = self.columns
@@ -166,7 +179,7 @@ class DataFrame(object):
new_dfs = _map_partitions(lambda df: df.tail(n),
df)
index = self._row_index.tail(n).index
index = self.index[-n:]
pd_tail = pd.concat(ray.get(new_dfs), axis=1, copy=False)
pd_tail.index = index
pd_tail.columns = self.columns
@@ -198,10 +211,7 @@ class DataFrame(object):
Returns:
The union of all indexes across the partitions.
"""
if isinstance(self._row_index, pd.core.indexes.range.RangeIndex) or \
isinstance(self._row_index, pd.core.indexes.base.Index):
return self._row_index
return self._row_index.index
return self._row_metadata.index
def _set_index(self, new_index):
"""Set the index for this DataFrame.
@@ -209,47 +219,17 @@ class DataFrame(object):
Args:
new_index: The new index to set this
"""
if isinstance(self._row_index, pd.core.indexes.range.RangeIndex) or \
isinstance(self._row_index, pd.core.indexes.base.Index):
self._row_index = new_index
else:
self._row_index.index = new_index
self._row_metadata.index = new_index
index = property(_get_index, _set_index)
def _get__row_index(self):
"""Get the _row_index for this DataFrame.
Returns:
The default index.
"""
if self._row_index_cache is None:
return None
if isinstance(self._row_index_cache, ray.local_scheduler.ObjectID):
self._row_index_cache = ray.get(self._row_index_cache)
return self._row_index_cache
def _set__row_index(self, new__index):
"""Set the _row_index for this DataFrame.
Args:
new__index: The new default index to set.
"""
self._row_index_cache = new__index
_row_index = property(_get__row_index, _set__row_index)
def _get_columns(self):
"""Get the columns for this DataFrame.
Returns:
The union of all indexes across the partitions.
"""
if isinstance(self._col_index, pd.core.indexes.range.RangeIndex) or \
isinstance(self._col_index, pd.core.indexes.base.Index):
return self._col_index
return self._col_index.index
return self._col_metadata.index
def _set_columns(self, new_index):
"""Set the columns for this DataFrame.
@@ -257,101 +237,16 @@ class DataFrame(object):
Args:
new_index: The new index to set this
"""
if isinstance(self._col_index, pd.core.indexes.range.RangeIndex) or \
isinstance(self._col_index, pd.core.indexes.base.Index):
self._col_index = new_index
return
self._col_index.index = new_index
self._col_metadata.index = new_index
columns = property(_get_columns, _set_columns)
def _get__col_index(self):
"""Get the _col_index for this DataFrame.
Returns:
The default index.
"""
if self._col_index_cache is None:
return None
if isinstance(self._col_index_cache, ray.local_scheduler.ObjectID):
self._col_index_cache = ray.get(self._col_index_cache)
return self._col_index_cache
def _set__col_index(self, new__index):
"""Set the _col_index for this DataFrame.
Args:
new__index: The new default index to set.
"""
self._col_index_cache = new__index
_col_index = property(_get__col_index, _set__col_index)
def _get_row_lengths(self):
"""Gets the lengths for each partition and caches it if it wasn't.
Returns:
A list of integers representing the length of each partition.
"""
if self._row_length_cache is None:
return None
if isinstance(self._row_length_cache, ray.local_scheduler.ObjectID):
self._row_length_cache = ray.get(self._row_length_cache)
elif isinstance(self._row_length_cache, list) and \
isinstance(self._row_length_cache[0],
ray.local_scheduler.ObjectID):
self._row_length_cache = ray.get(self._row_length_cache)
return self._row_length_cache
def _set_row_lengths(self, lengths):
"""Sets the lengths of each partition for this DataFrame.
We use this because we can compute it when creating the DataFrame.
Args:
lengths ([ObjectID or Int]): A list of lengths for each
partition, in order.
"""
self._row_length_cache = lengths
_row_lengths = property(_get_row_lengths, _set_row_lengths)
def _get_col_lengths(self):
"""Gets the lengths for each partition and caches it if it wasn't.
Returns:
A list of integers representing the length of each partition.
"""
if self._col_length_cache is None:
return None
if isinstance(self._col_length_cache, ray.local_scheduler.ObjectID):
self._col_length_cache = ray.get(self._col_length_cache)
elif isinstance(self._col_length_cache, list) and \
isinstance(self._col_length_cache[0],
ray.local_scheduler.ObjectID):
self._col_length_cache = ray.get(self._col_length_cache)
return self._col_length_cache
def _set_col_lengths(self, lengths):
"""Sets the lengths of each partition for this DataFrame.
We use this because we can compute it when creating the DataFrame.
Args:
lengths ([ObjectID or Int]): A list of lengths for each
partition, in order.
"""
self._col_length_cache = lengths
_col_lengths = property(_get_col_lengths, _set_col_lengths)
def _arithmetic_helper(self, remote_func, axis, level=None):
# TODO: We don't support `level` right now
if level is not None:
raise NotImplementedError("Level not yet supported.")
axis = self._row_index._get_axis_number(axis) if axis is not None \
axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
else 0
oid_series = ray.get(_map_partitions(remote_func,
@@ -362,12 +257,9 @@ class DataFrame(object):
# We use the index to get the internal index.
oid_series = [(oid_series[i], i) for i in range(len(oid_series))]
for df, index in oid_series:
this_partition = \
self._col_index[self._col_index['partition'] == index]
df.index = this_partition[
this_partition['index_within_partition'].isin(df.index)
].index
for df, partition in oid_series:
this_partition = self._col_metadata.partition_series(partition)
df.index = this_partition[this_partition.isin(df.index)].index
result_series = pd.concat([obj[0] for obj in oid_series],
axis=0, copy=False)
@@ -502,10 +394,10 @@ class DataFrame(object):
if row_partitions is not None or col_partitions is not None:
# At least one partition list is being updated, so recompute
# lengths and indices
self._row_lengths, self._row_index = \
_build_index.remote(self._block_partitions[:, 0], index)
self._col_lengths, self._col_index = \
_build_columns.remote(self._block_partitions[0, :], columns)
self._row_metadata = _IndexMetadata(self._block_partitions[:, 0],
index=index, axis=0)
self._col_metadata = _IndexMetadata(self._block_partitions[0, :],
index=columns, axis=1)
def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
@@ -920,7 +812,7 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def _cumulative_helper(self, func, axis):
axis = self._row_index._get_axis_number(axis) if axis is not None \
axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
else 0
if axis == 0:
@@ -1024,12 +916,9 @@ class DataFrame(object):
# We use the index to get the internal index.
parts = [(parts[i], i) for i in range(len(parts))]
for df, index in parts:
this_partition = \
self._col_index[self._col_index['partition'] == index]
df.columns = this_partition[
this_partition['index_within_partition'].isin(df.columns)
].index
for df, partition in parts:
this_partition = self._col_metadata.partition_series(partition)
df.columns = this_partition[this_partition.isin(df.columns)].index
# Remove index from tuple
result = pd.concat([obj[0] for obj in parts], axis=1, copy=False)
@@ -1085,21 +974,24 @@ class DataFrame(object):
if index is not None or columns is not None:
raise ValueError("Cannot specify both 'labels' and "
"'index'/'columns'")
axis = self._row_index._get_axis_name(axis)
axis = pd.DataFrame()._get_axis_name(axis)
axes = {axis: labels}
elif index is not None or columns is not None:
axes, _ = self._row_index._construct_axes_from_arguments((index,
columns),
{})
axes, _ = pd.DataFrame()._construct_axes_from_arguments((index,
columns),
{})
else:
raise ValueError("Need to specify at least one of 'labels', "
"'index' or 'columns'")
obj = self.copy()
def drop_helper(obj, axis, label):
# TODO(patyang): If you drop from the index first, you can do it
# in batch by returning the dropped items. Likewise coords.drop
# leaves the coords df in an inconsistent state.
if axis == 'index':
try:
coords = obj._row_index.loc[label]
coords = obj._row_metadata[label]
if isinstance(coords, pd.DataFrame):
partitions = list(coords['partition'])
indexes = list(coords['index_within_partition'])
@@ -1121,18 +1013,14 @@ class DataFrame(object):
# The decrement here is because we're dropping one at a
# time and the index is automatically updated when we
# convert back to blocks.
obj._row_index = obj._row_index.copy()
obj._row_index.loc[
(obj._row_index.partition == part) &
(obj._row_index.index_within_partition > index),
'index_within_partition'] -= 1
obj._row_metadata.squeeze(part, index)
obj._row_index.drop(labels=label, axis=0, inplace=True)
obj._row_metadata.drop(labels=label)
except KeyError:
return obj
else:
try:
coords = obj._col_index.loc[label]
coords = obj._col_metadata[label]
if isinstance(coords, pd.DataFrame):
partitions = list(coords['partition'])
indexes = list(coords['index_within_partition'])
@@ -1154,13 +1042,9 @@ class DataFrame(object):
# The decrement here is because we're dropping one at a
# time and the index is automatically updated when we
# convert back to blocks.
obj._col_index = obj._col_index.copy()
obj._col_index.loc[
(obj._col_index.partition == part) &
(obj._col_index.index_within_partition > index),
'index_within_partition'] -= 1
obj._col_metadata.squeeze(part, index)
obj._col_index.drop(labels=label, axis=0, inplace=True)
obj._col_metadata.drop(labels=label)
except KeyError:
return obj
@@ -1189,8 +1073,8 @@ class DataFrame(object):
if not inplace:
return obj
else:
self._row_index = obj._row_index
self._col_index = obj._col_index
self._row_metadata = obj._row_metadata
self._col_metadata = obj._col_metadata
self._block_partitions = obj._block_partitions
def drop_duplicates(self, subset=None, keep='first', inplace=False):
@@ -1223,14 +1107,15 @@ class DataFrame(object):
results = []
other_partition = None
other_df = None
for i, idx in other._row_index.iterrows():
# TODO: Make the appropriate coord df accessor methods for this fxn
for i, idx in other._row_metadata._coord_df.iterrows():
if idx['partition'] != other_partition:
other_df = ray.get(other._row_partitions[idx['partition']])
other_partition = idx['partition']
# TODO: group series here into full df partitions to reduce
# the number of remote calls to helper
other_series = other_df.iloc[idx['index_within_partition']]
curr_index = self._row_index.iloc[i]
curr_index = self._row_metadata._coord_df.iloc[i]
curr_df = self._row_partitions[int(curr_index['partition'])]
results.append(_deploy_func.remote(helper,
curr_df,
@@ -1299,7 +1184,8 @@ class DataFrame(object):
inplace = validate_bool_kwarg(inplace, "inplace")
new_rows = _map_partitions(eval_helper, self._row_partitions)
columns_copy = self._col_index.T.copy()
# TODO: This doesn't work if the expression is not an assignment
columns_copy = self._col_metadata._coord_df.T.copy()
columns_copy.eval(expr, inplace=True, **kwargs)
columns = columns_copy.columns
@@ -1369,7 +1255,7 @@ class DataFrame(object):
inplace = validate_bool_kwarg(inplace, 'inplace')
axis = self._row_index._get_axis_number(axis) \
axis = pd.DataFrame()._get_axis_number(axis) \
if axis is not None \
else 0
@@ -1392,12 +1278,10 @@ class DataFrame(object):
else:
new_obj = self.copy()
if axis == 0:
parts = new_obj._col_partitions
idx_obj = new_obj._col_index
else:
parts = new_obj._row_partitions
idx_obj = new_obj._row_index
parts, coords_obj = (new_obj._col_partitions,
new_obj._col_metadata) if axis == 0 else \
(new_obj._row_partitions,
new_obj._row_metadata)
if isinstance(value, (pd.Series, dict)):
new_vals = {}
@@ -1405,7 +1289,7 @@ class DataFrame(object):
for val in value:
# Get the local index for the partition
try:
part, index = idx_obj.loc[val]
part, index = coords_obj[val]
# Pandas ignores these errors so we will suppress them too.
except KeyError:
continue
@@ -1421,8 +1305,8 @@ class DataFrame(object):
# Not every partition was changed, so we put everything back that
# was not changed and update those that were.
new_parts = [parts[i] if idx_obj.index[i] not in new_vals
else new_vals[idx_obj.index[i]]
new_parts = [parts[i] if coords_obj.index[i] not in new_vals
else new_vals[coords_obj.index[i]]
for i in range(len(parts))]
else:
new_parts = _map_partitions(lambda df: df.fillna(
@@ -1461,9 +1345,7 @@ class DataFrame(object):
Returns:
scalar: type of index
"""
if self._row_index is not None:
return self._row_index.first_valid_index()
return None
return self._row_metadata.first_valid_index()
def floordiv(self, other, axis='columns', level=None, fill_value=None):
raise NotImplementedError(
@@ -1561,15 +1443,13 @@ class DataFrame(object):
Returns:
A new dataframe with the first n rows of the dataframe.
"""
sizes = self._row_lengths
if n >= sum(sizes):
if n >= len(self._row_metadata):
return self.copy()
new_dfs = _map_partitions(lambda df: df.head(n),
self._col_partitions)
index = self._row_index.head(n).index
index = self._row_metadata.index[:n]
return DataFrame(col_partitions=new_dfs,
columns=self.columns,
@@ -1662,27 +1542,25 @@ class DataFrame(object):
if loc < 0:
raise ValueError("unbounded slice")
# Perform insert on a specific column partition
# Determine which column partition to place it in, and where in that
# partition
col_cum_lens = np.cumsum(self._col_lengths)
col_part_idx = np.digitize(loc, col_cum_lens[:-1])
col_part_loc = loc - np.asscalar(
np.concatenate(([0], col_cum_lens))[col_part_idx])
partition, index_within_partition = \
self._col_metadata.insert(column, loc)
# Deploy insert function to specific column partition, and replace that
# column
def insert_col_part(df):
df.insert(col_part_loc, column, value, allow_duplicates)
df.insert(index_within_partition, column, value, allow_duplicates)
return df
print('partition:', partition)
print('i_w_partition', index_within_partition)
print('df:\n', ray.get(self._col_partitions[partition]))
new_obj = _deploy_func.remote(insert_col_part,
self._col_partitions[col_part_idx])
self._col_partitions[partition])
new_cols = [self._col_partitions[i]
if i != col_part_idx
if i != partition
else new_obj
for i in range(len(self._col_partitions))]
new_col_names = self._col_index.index.insert(loc, column)
new_col_names = self.columns.insert(loc, column)
self._update_inplace(col_partitions=new_cols, columns=new_col_names)
@@ -1816,9 +1694,7 @@ class DataFrame(object):
Returns:
scalar: type of index
"""
if self._row_index is not None:
return self._row_index.last_valid_index()
return None
return self._row_metadata.last_valid_index()
def le(self, other, axis='columns', level=None):
raise NotImplementedError(
@@ -2236,8 +2112,7 @@ class DataFrame(object):
if axes_is_columns:
renamed.columns.name = mapper
else:
renamed._row_index.rename_axis(mapper, axis=axis, copy=copy,
inplace=True)
renamed.index.name = mapper
if not inplace:
return renamed
@@ -2257,7 +2132,7 @@ class DataFrame(object):
if axes_is_columns:
renamed.columns.set_names(name)
else:
renamed._row_index.set_names(name)
renamed.index.set_names(name)
if not inplace:
return renamed
@@ -2333,10 +2208,7 @@ class DataFrame(object):
return values
# We're building a new default index dataframe for use later.
_, new_index = \
_build_index.remote(new_obj._block_partitions[:, 0], None)
new_index = ray.get(new_index).index
new_index = pd.RangeIndex(len(self))
if level is not None:
if not isinstance(level, (tuple, list)):
level = [level]
@@ -2485,7 +2357,7 @@ class DataFrame(object):
FutureWarning, stacklevel=2)
inplace = True
if inplace:
setattr(self, self._row_index._get_axis_name(axis), labels)
setattr(self, pd.DataFrame()._get_axis_name(axis), labels)
else:
obj = self.copy()
obj.set_axis(labels, axis=axis, inplace=True)
@@ -2666,15 +2538,13 @@ class DataFrame(object):
Returns:
A new dataframe with the last n rows of this dataframe.
"""
sizes = self._row_lengths
if n >= sum(sizes):
if n >= len(self._row_metadata):
return self
new_dfs = _map_partitions(lambda df: df.tail(n),
self._col_partitions)
index = self._row_index.tail(n).index
index = self._row_metadata.index[-n:]
return DataFrame(col_partitions=new_dfs,
columns=self.columns,
index=index)
@@ -2921,7 +2791,7 @@ class DataFrame(object):
pass
# see if we can slice the rows
indexer = convert_to_index_sliceable(self._row_index, key)
indexer = self._row_metadata.convert_to_index_sliceable(key)
if indexer is not None:
raise NotImplementedError("To contribute to Pandas on Ray, please"
"visit github.com/ray-project/ray.")
@@ -2941,7 +2811,8 @@ class DataFrame(object):
return self._getitem_column(key)
def _getitem_column(self, key):
partition = self._col_index.loc[key].loc['partition']
# may result in multiple columns?
partition = self._col_metadata[key, 'partition']
result = ray.get(self._getitem_indiv_col(key, partition))
result.name = key
result.index = self.index
@@ -2983,7 +2854,7 @@ class DataFrame(object):
index=index)
def _getitem_indiv_col(self, key, part):
loc = self._col_index.loc[key]
loc = self._col_metadata[key]
if isinstance(loc, pd.Series):
index = loc[loc['partition'] == part]
else:
@@ -3019,7 +2890,7 @@ class DataFrame(object):
Returns:
Returns an integer length of the dataframe object.
"""
return sum(self._row_lengths)
return len(self._row_metadata)
def __unicode__(self):
raise NotImplementedError(
@@ -3126,7 +2997,7 @@ class DataFrame(object):
del_helper, self._row_partitions, to_delete)
# This structure is used to get the correct index inside the partition.
del_df = self._col_index.loc[key]
del_df = self._col_metadata[key]
# We need to standardize between multiple and single occurrences in the
# columns. Putting single occurrences in a pd.DataFrame and transposing
@@ -3136,9 +3007,9 @@ class DataFrame(object):
# Cast cols as pd.Series as duplicate columns mean result may be
# np.int64 or pd.Series
col_parts_to_del = pd.Series(
self._col_index.loc[key, 'partition']).unique()
self._col_index.drop(key, inplace=True)
col_parts_to_del = \
pd.Series(self._col_metadata[key, 'partition']).unique()
self._col_metadata.drop(key)
for i in col_parts_to_del:
# Compute the correct index inside the partition to delete.
to_delete_in_partition = \
@@ -3147,22 +3018,7 @@ class DataFrame(object):
self._col_partitions[i] = _deploy_func.remote(
del_helper, self._col_partitions[i], to_delete_in_partition)
partition_mask = (self._col_index['partition'] == i)
# Since we are replacing columns with RangeIndex inside the
# partition, we have to make sure that our reference to it is
# updated as well.
try:
self._col_index.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
except ValueError:
# Copy the arrow sealed dataframe so we can mutate it.
# We only do this the first time we try to mutate the sealed.
self._col_index = self._col_index.copy()
self._col_index.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
self._col_metadata.reset_partition_coords(col_parts_to_del)
def __finalize__(self, other, method=None, **kwargs):
raise NotImplementedError(
+341
View File
@@ -0,0 +1,341 @@
import pandas as pd
import numpy as np
import ray
from .utils import (
_build_index,
_build_columns)
from pandas.core.indexing import convert_to_index_sliceable
class _IndexMetadataBase(object):
"""Wrapper for Pandas indexes in Ray DataFrames. Handles all of the
metadata specific to the axis of partition (setting indexes,
calculating the index within partition of a value, etc.) since the
dataframe may be partitioned across either axis. This way we can unify the
possible index operations over one axis-agnostic interface.
This class is the abstract superclass for IndexMetadata and
WrappingIndexMetadata, which handle indexes along the partitioned and
non-partitioned axes, respectively.
IMPORTANT NOTE: Currently all operations, as implemented, are inplace.
"""
def _get__coord_df(self):
if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID):
self._coord_df_cache = ray.get(self._coord_df_cache)
return self._coord_df_cache
def _set__coord_df(self, coord_df):
self._coord_df_cache = coord_df
_coord_df = property(_get__coord_df, _set__coord_df)
def _get_index(self):
"""Get the index wrapped by this IndexDF.
Returns:
The index wrapped by this IndexDF
"""
return self._coord_df.index
def _set_index(self, new_index):
"""Set the index wrapped by this IndexDF.
Args:
new_index: The new index to wrap
"""
self._coord_df.index = new_index
index = property(_get_index, _set_index)
def coords_of(self, key):
raise NotImplementedError()
def __getitem__(self, key):
return self.coords_of(key)
def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
group_keys=True, squeeze=False, **kwargs):
raise NotImplementedError()
def __len__(self):
return len(self._coord_df)
def first_valid_index(self):
return self._coord_df.first_valid_index()
def last_valid_index(self):
return self._coord_df.last_valid_index()
def insert(self, key, loc=None, partition=None,
index_within_partition=None):
raise NotImplementedError()
def drop(self, labels, errors='raise'):
"""Drop the specified labels from the IndexMetadata
Args:
labels (scalar or list-like):
The labels to drop
errors ('raise' or 'ignore'):
If 'ignore', suppress errors for when labels don't exist
Returns:
DataFrame with coordinates of dropped labels
"""
# TODO(patyang): This produces inconsistent indexes.
dropped = self.coords_of(labels)
self._coord_df = self._coord_df.drop(labels, errors=errors)
return dropped
def rename_index(self, mapper):
"""Rename the index.
Args:
mapper: name to rename the index as
"""
self._coord_df = self._coord_df.rename_axis(mapper, axis=0)
def convert_to_index_sliceable(self, key):
"""Converts and performs error checking on the passed slice
Args:
key: slice to convert and check
"""
return convert_to_index_sliceable(self._coord_df, key)
class _IndexMetadata(_IndexMetadataBase):
"""IndexMetadata implementation for index across a partitioned axis. This
implementation assumes the underlying index lies across multiple
partitions.
"""
def __init__(self, dfs, index=None, axis=0):
"""Inits a IndexMetadata from Ray DataFrame partitions
Args:
dfs ([ObjectID]): ObjectIDs of dataframe partitions
index (pd.Index): Index of the Ray DataFrame.
axis: Axis of partition (0=row partitions, 1=column partitions)
Returns:
A IndexMetadata backed by the specified pd.Index, partitioned off
specified partitions
"""
lengths_oid, coord_df_oid = \
_build_index.remote(dfs, index) if axis == 0 else \
_build_columns.remote(dfs, index)
self._coord_df = coord_df_oid
self._lengths = lengths_oid
def _get__lengths(self):
if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \
(isinstance(self._lengths_cache, list) and
isinstance(self._lengths_cache[0], ray.local_scheduler.ObjectID)):
self._lengths_cache = ray.get(self._lengths_cache)
return self._lengths_cache
def _set__lengths(self, lengths):
self._lengths_cache = lengths
_lengths = property(_get__lengths, _set__lengths)
def coords_of(self, key):
"""Returns the coordinates (partition, index_within_partition) of the
provided key in the index. Can be called on its own or implicitly
through __getitem__
Args:
key:
item to get coordinates of. Can also be a tuple of item
and {partition, index_within_partition} if caller only
needs one of the coordinates
Returns:
Pandas object with the keys specified. If key is a single object
it will be a pd.Series with items `partition` and
`index_within_partition`, and if key is a slice or if the key is
duplicate it will be a pd.DataFrame with said items as columns.
"""
return self._coord_df.loc[key]
def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
group_keys=True, squeeze=False, **kwargs):
# TODO: Find out what this does, and write a docstring
assignments_df = self._coord_df.groupby(by=by, axis=axis, level=level,
as_index=as_index, sort=sort,
group_keys=group_keys,
squeeze=squeeze, **kwargs)\
.apply(lambda x: x[:])
return assignments_df
def partition_series(self, partition):
return self[self._coord_df['partition'] == partition,
'index_within_partition']
def __len__(self):
# Hard to say if this is faster than IndexMetadataBase.__len__ if
# self._coord_df is non-resident
return sum(self._lengths)
def reset_partition_coords(self, partitions=None):
partitions = np.array(partitions)
for partition in partitions:
partition_mask = (self._coord_df['partition'] == partition)
# Since we are replacing columns with RangeIndex inside the
# partition, we have to make sure that our reference to it is
# updated as well.
try:
self._coord_df.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
except ValueError:
# Copy the arrow sealed dataframe so we can mutate it.
# We only do this the first time we try to mutate the sealed.
self._coord_df = self._coord_df.copy()
self._coord_df.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
def insert(self, key, loc=None, partition=None,
index_within_partition=None):
"""Inserts a key at a certain location in the index, or a certain coord
in a partition. Called with either `loc` or `partition` and
`index_within_partition`. If called with both, `loc` will be used.
Args:
key: item to insert into index
loc: location to insert into index
partition: partition to insert into
index_within_partition: index within partition to insert into
Returns:
DataFrame with coordinates of insert
"""
# Perform insert on a specific partition
# Determine which partition to place it in, and where in that partition
if loc is not None:
cum_lens = np.cumsum(self._lengths)
partition = np.digitize(loc, cum_lens[:-1])
if partition >= len(cum_lens):
if loc > cum_lens[-1]:
raise IndexError("index {0} is out of bounds".format(loc))
else:
index_within_partition = self._lengths[-1]
else:
first_in_partition = \
np.asscalar(np.concatenate(([0], cum_lens))[partition])
index_within_partition = loc - first_in_partition
# TODO: Stop-gap solution until we begin passing IndexMetadatas
return partition, index_within_partition
# Generate new index
new_index = self.index.insert(loc, key)
# Shift indices in partition where we inserted column
idx_locs = (self._coord_df.partition == partition) & \
(self._coord_df.index_within_partition ==
index_within_partition)
# TODO: Determine why self._coord_df{,_cache} are read-only
_coord_df_copy = self._coord_df.copy()
_coord_df_copy.loc[idx_locs, 'index_within_partition'] += 1
# TODO: Determine if there's a better way to do a row-index insert in
# pandas, because this is very annoying/unsure of efficiency
# Create new coord entry to insert
coord_to_insert = pd.DataFrame(
{'partition': partition,
'index_within_partition': index_within_partition},
index=[key])
# Insert into cached RangeIndex, and order by new column index
self._coord_df = _coord_df_copy.append(coord_to_insert).loc[new_index]
# Return inserted coordinate for callee
return coord_to_insert
def squeeze(self, partition, index_within_partition):
self._coord_df = self._coord_df.copy()
partition_mask = self._coord_df.partition == partition
index_within_partition_mask = \
self._coord_df.index_within_partition > index_within_partition
self._coord_df.loc[partition_mask & index_within_partition_mask,
'index_within_partition'] -= 1
class _WrappingIndexMetadata(_IndexMetadata):
"""IndexMetadata implementation for index across a non-partitioned axis.
This implementation assumes the underlying index lies across one partition.
"""
def __init__(self, index):
"""Inits a IndexMetadata from Pandas Index only.
Args:
index (pd.Index): Index to wrap.
Returns:
A IndexMetadata backed by the specified pd.Index.
"""
self._coord_df = pd.DataFrame(index=index)
# Set _lengths as a dummy variable for future-proof method inheritance
self._lengths = [len(index)]
def coords_of(self, key):
"""Returns the coordinates (partition, index_within_partition) of the
provided key in the index
Args:
key: item to get coordinates of
Returns:
Pandas object with the keys specified. If key is a single object
it will be a pd.Series with items `partition` and
`index_within_partition`, and if key is a slice it will be a
pd.DataFrame with said items as columns.
"""
locs = self.index.get_loc(key)
# locs may be a single int, a slice, or a boolean mask.
# Convert here to iterable of integers
loc_idxs = pd.RangeIndex(len(self.index))[locs]
# TODO: Investigate "modify view/copy" warning
ret_obj = self._coord_df.loc[key]
ret_obj['partition'] = 0
ret_obj['index_within_partition'] = loc_idxs
return ret_obj
def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
group_keys=True, squeeze=False, **kwargs):
raise NotImplementedError()
def insert(self, key, loc=None, partition=None,
index_within_partition=None):
"""Inserts a key at a certain location in the index, or a certain coord
in a partition. Called with either `loc` or `partition` and
`index_within_partition`. If called with both, `loc` will be used.
Args:
key: item to insert into index
loc: location to insert into index
partition: partition to insert into
index_within_partition: index within partition to insert into
Returns:
DataFrame with coordinates of insert
"""
# Generate new index
new_index = self.index.insert(loc, key)
# Make new empty coord_df
self._coord_df = pd.DataFrame(index=new_index)
# Shouldn't really need this, but here to maintain API consistency
return pd.DataFrame({'partition': 0, 'index_within_partition': loc},
index=[key])