[DataFrames] Updating Index implementation, performance improvements (#1598)

This commit is contained in:
Devin Petersohn
2018-02-25 13:39:28 -08:00
committed by Robert Nishihara
parent 31fefa20b7
commit 529397b35e
4 changed files with 205 additions and 163 deletions
+145 -85
View File
@@ -7,8 +7,6 @@ import numpy as np
import ray
import itertools
from .index import Index
class DataFrame(object):
@@ -20,22 +18,21 @@ class DataFrame(object):
partitions.
columns (pandas.Index): The column names for this dataframe, in
pandas Index object.
index (pandas.Index or list): The row index for this dataframe.
"""
assert(len(df) > 0)
self._df = df
# TODO: Clean up later.
# We will call get only when we access the object (and only once).
self._lengths = \
ray.get([_deploy_func.remote(_get_lengths, d) for d in self._df])
self._lengths = [_deploy_func.remote(_get_lengths, d)
for d in self._df]
self.columns = columns
if index is None:
self._index = self._default_index()
else:
self._index = index
# this _index object is a pd.DataFrame
# and we use that DataFrame's Index to index the rows.
self._index = self._default_index()
self._pd_index = None
if index is not None:
self.index = index
def __str__(self):
return "ray.DataFrame object"
@@ -49,10 +46,7 @@ class DataFrame(object):
Returns:
The union of all indexes across the partitions.
"""
if self._pd_index is None:
self._pd_index = Index.to_pandas(self._index)
return self._pd_index
return self._index.index
def _set_index(self, new_index):
"""Set the index for this DataFrame.
@@ -60,18 +54,47 @@ class DataFrame(object):
Args:
new_index: The new index to set this
"""
self._pd_index = None
self._index = Index.from_pandas(new_index, self._lengths)
self._index.index = new_index
def _default_index(self):
dest_indices = [(i, j)
for i in range(len(self._lengths))
for j in range(self._lengths[i])]
return Index({i: dest_indices[i] for i in range(len(dest_indices))},
pd.RangeIndex)
"""Create a default index, which is a RangeIndex
Returns:
The pd.RangeIndex object that represents this DataFrame.
"""
dest_indices = {"partition":
[i for i in range(len(self._lengths))
for j in range(self._lengths[i])],
"index_within_partition":
[j for i in range(len(self._lengths))
for j in range(self._lengths[i])]}
return pd.DataFrame(dest_indices)
index = property(_get_index, _set_index)
def _get_lengths(self):
"""Gets the lengths for each partition and caches it if it wasn't.
Returns:
A list of integers representing the length of each partition.
"""
if isinstance(self._length_cache[0], ray.local_scheduler.ObjectID):
self._length_cache = ray.get(self._length_cache)
return self._length_cache
def _set_lengths(self, lengths):
"""Sets the lengths of each partition for this DataFrame.
We use this because we can compute it when creating the DataFrame.
Args:
lengths ([ObjectID or Int]): A list of lengths for each
partition, in order.
"""
self._length_cache = lengths
_lengths = property(_get_lengths, _set_lengths)
@property
def size(self):
"""Get the number of elements in the DataFrame.
@@ -79,8 +102,7 @@ class DataFrame(object):
Returns:
The number of elements in the DataFrame.
"""
sizes = ray.get(self._map_partitions(lambda df: df.size)._df)
return sum(sizes)
return len(self.index) * len(self.columns)
@property
def ndim(self):
@@ -154,7 +176,7 @@ class DataFrame(object):
"""
return (len(self.index), len(self.columns))
def _map_partitions(self, func, *args):
def _map_partitions(self, func, index=None):
"""Apply a function on each partition.
Args:
@@ -165,8 +187,10 @@ class DataFrame(object):
"""
assert(callable(func))
new_df = [_deploy_func.remote(func, part) for part in self._df]
if index is None:
index = self.index
return DataFrame(new_df, self.columns, index=self._index)
return DataFrame(new_df, self.columns, index=index)
def add_prefix(self, prefix):
"""Add a prefix to each of the column names.
@@ -174,9 +198,8 @@ class DataFrame(object):
Returns:
A new DataFrame containing the new column names.
"""
new_dfs = self._map_partitions(lambda df: df.add_prefix(prefix))
new_cols = self.columns.map(lambda x: str(prefix) + str(x))
return DataFrame(new_dfs._df, new_cols, index=self._index)
return DataFrame(self._df, new_cols, index=self.index)
def add_suffix(self, suffix):
"""Add a suffix to each of the column names.
@@ -184,9 +207,8 @@ class DataFrame(object):
Returns:
A new DataFrame containing the new column names.
"""
new_dfs = self._map_partitions(lambda df: df.add_suffix(suffix))
new_cols = self.columns.map(lambda x: str(x) + str(suffix))
return DataFrame(new_dfs._df, new_cols, index=self._index)
return DataFrame(self._df, new_cols, index=self.index)
def applymap(self, func):
"""Apply a function to a DataFrame elementwise.
@@ -203,7 +225,7 @@ class DataFrame(object):
Returns:
A new DataFrame pointing to the same partitions as this one.
"""
return DataFrame(self._df, self.columns, index=self._index)
return DataFrame(self._df, self.columns, index=self.index)
def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True,
group_keys=True, squeeze=False, **kwargs):
@@ -221,8 +243,7 @@ class DataFrame(object):
A new DataFrame resulting from the groupby.
"""
indices = list(set(
[index for df in ray.get(self._df) for index in list(df.index)]))
indices = self.index.unique()
chunksize = int(len(indices) / len(self._df))
partitions = [_shuffle.remote(df, indices, chunksize)
@@ -238,7 +259,7 @@ class DataFrame(object):
shuffle[i].append(partitions[j][i])
new_dfs = [_local_groupby.remote(part, axis=axis) for part in shuffle]
return DataFrame(new_dfs, self.columns)
return DataFrame(new_dfs, self.columns, index=indices)
def reduce_by_index(self, func, axis=0):
"""Perform a reduction based on the row index.
@@ -250,7 +271,8 @@ class DataFrame(object):
Returns:
A new DataFrame with the result of the reduction.
"""
return self.groupby(axis=axis)._map_partitions(func)
return self.groupby(axis=axis)._map_partitions(
func, index=pd.unique(self.index))
def sum(self, axis=None, skipna=True, level=None, numeric_only=None):
"""Perform a sum across the DataFrame.
@@ -262,9 +284,14 @@ class DataFrame(object):
Returns:
The sum of the DataFrame.
"""
intermediate_index = [idx
for _ in range(len(self._df))
for idx in self.columns]
sum_of_partitions = self._map_partitions(
lambda df: df.sum(axis=axis, skipna=skipna, level=level,
numeric_only=numeric_only))
numeric_only=numeric_only),
index=intermediate_index)
return sum_of_partitions.reduce_by_index(
lambda df: df.sum(axis=axis, skipna=skipna, level=level,
@@ -276,6 +303,10 @@ class DataFrame(object):
Returns:
A new DataFrame with the applied absolute value.
"""
for t in self.dtypes:
if np.dtype('O') == t:
# TODO Give a more accurate error to Pandas
raise TypeError("bad operand type for abs():", "str")
return self._map_partitions(lambda df: df.abs())
def isin(self, values):
@@ -321,7 +352,7 @@ class DataFrame(object):
A pandas Index for this DataFrame.
"""
# Each partition should have the same index, so we'll use 0's
return ray.get(_deploy_func.remote(lambda df: df.keys(), self._df[0]))
return self.columns
def transpose(self, *args, **kwargs):
"""Transpose columns and rows for the DataFrame.
@@ -331,8 +362,14 @@ class DataFrame(object):
Returns:
A new DataFrame transposed from this DataFrame.
"""
temp_index = [idx
for _ in range(len(self._df))
for idx in self.columns]
temp_columns = self.index
local_transpose = self._map_partitions(
lambda df: df.transpose(*args, **kwargs))
lambda df: df.transpose(*args, **kwargs), index=temp_index)
local_transpose.columns = temp_columns
# Sum will collapse the NAs from the groupby
return local_transpose.reduce_by_index(
@@ -387,17 +424,15 @@ class DataFrame(object):
if axis is None or axis == 0:
df = self.T
axis = 1
ordered_index = df.columns
else:
df = self
ordered_index = df.index
mapped = df._map_partitions(lambda df: df.all(axis,
bool_only,
skipna,
level,
**kwargs))
return to_pandas(mapped)[ordered_index]
return to_pandas(mapped)
def any(self, axis=None, bool_only=None, skipna=None, level=None,
**kwargs):
@@ -410,17 +445,15 @@ class DataFrame(object):
if axis is None or axis == 0:
df = self.T
axis = 1
ordered_index = df.columns
else:
df = self
ordered_index = df.index
mapped = df._map_partitions(lambda df: df.any(axis,
bool_only,
skipna,
level,
**kwargs))
return to_pandas(mapped)[ordered_index]
return to_pandas(mapped)
def append(self, other, ignore_index=False, verify_integrity=False):
raise NotImplementedError("Not Yet implemented.")
@@ -514,14 +547,17 @@ class DataFrame(object):
def count(self, axis=0, level=None, numeric_only=False):
if axis == 1:
original_index = self.index
return self.T.count(axis=0,
level=level,
numeric_only=numeric_only)[original_index]
numeric_only=numeric_only)
else:
temp_index = [idx
for _ in range(len(self._df))
for idx in self.columns]
return sum(ray.get(self._map_partitions(lambda df: df.count(
axis=axis, level=level, numeric_only=numeric_only
))._df))
), index=temp_index)._df))
def cov(self, min_periods=None):
raise NotImplementedError("Not Yet implemented.")
@@ -677,20 +713,29 @@ class DataFrame(object):
Returns:
A new dataframe with the first n rows of the dataframe.
"""
sizes = ray.get(self._map_partitions(lambda df: df.size)._df)
new_dfs = []
i = 0
while n > 0 and i < len(self._df):
if (n - sizes[i]) < 0:
new_dfs.append(_deploy_func.remote(lambda df: df.head(n),
self._df[i]))
break
else:
new_dfs.append(self._df[i])
n -= sizes[i]
i += 1
sizes = self._lengths
return DataFrame(new_dfs, self.columns)
if n >= sum(sizes):
return self
cumulative = np.cumsum(np.array(sizes))
new_dfs = [self._df[i]
for i in range(len(cumulative))
if cumulative[i] < n]
last_index = len(new_dfs)
# this happens when we only need from the first partition
if last_index == 0:
num_to_transfer = n
else:
num_to_transfer = n - cumulative[last_index - 1]
new_dfs.append(_deploy_func.remote(lambda df: df.head(num_to_transfer),
self._df[last_index]))
index = self._index.head(n).index
return DataFrame(new_dfs, self.columns, index=index)
def hist(self, data, column=None, by=None, grid=True, xlabelsize=None,
xrot=None, ylabelsize=None, yrot=None, ax=None, sharex=False,
@@ -708,6 +753,10 @@ class DataFrame(object):
A Series with the index for each maximum value for the axis
specified.
"""
for t in self.dtypes:
if np.dtype('O') == t:
# TODO Give a more accurate error to Pandas
raise TypeError("bad operand type for abs():", "str")
if axis == 1:
return to_pandas(self._map_partitions(
lambda df: df.idxmax(axis=axis, skipna=skipna)))
@@ -725,6 +774,10 @@ class DataFrame(object):
A Series with the index for each minimum value for the axis
specified.
"""
for t in self.dtypes:
if np.dtype('O') == t:
# TODO Give a more accurate error to Pandas
raise TypeError("bad operand type for abs():", "str")
if axis == 1:
return to_pandas(self._map_partitions(
lambda df: df.idxmin(axis=axis, skipna=skipna)))
@@ -1165,21 +1218,31 @@ class DataFrame(object):
Returns:
A new dataframe with the last n rows of this dataframe.
"""
sizes = ray.get(self._map_partitions(lambda df: df.size)._df)
new_dfs = []
i = len(self._df) - 1
while n > 0 and i >= 0:
if (n - sizes[i]) < 0:
new_dfs.append(_deploy_func.remote(lambda df: df.head(n),
self._df[i]))
break
else:
new_dfs.append(self._df[i])
n -= sizes[i]
i -= 1
# we were adding in reverse order, so make it right.
new_dfs.reverse()
return DataFrame(new_dfs, self.columns)
sizes = self._lengths
if n >= sum(sizes):
return self
cumulative = np.cumsum(np.array(sizes.reverse()))
reverse_dfs = self._df.reverse()
new_dfs = [reverse_dfs[i]
for i in range(len(cumulative))
if cumulative[i] < n]
last_index = len(new_dfs)
# this happens when we only need from the last partition
if last_index == 0:
num_to_transfer = n
else:
num_to_transfer = n - cumulative[last_index - 1]
new_dfs.append(_deploy_func.remote(lambda df: df.tail(num_to_transfer),
reverse_dfs[last_index])).reverse()
index = self._index.tail(n).index
return DataFrame(new_dfs, self.columns, index=index)
def take(self, indices, axis=0, convert=None, is_copy=True, **kwargs):
raise NotImplementedError("Not Yet implemented.")
@@ -1623,27 +1686,24 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True):
elif chunksize is None:
raise ValueError("The number of partitions or chunksize must be set.")
old_index = df.index
temp_df = df
# TODO stop reassigning df
dataframes = []
lengths = []
while len(df) > chunksize:
t_df = df[:chunksize]
while len(temp_df) > chunksize:
t_df = temp_df[:chunksize]
lengths.append(len(t_df))
# reindex here because we want a pd.RangeIndex within the partitions.
# It is smaller and sometimes faster.
t_df.reindex()
top = ray.put(t_df)
dataframes.append(top)
df = df[chunksize:]
temp_df = temp_df[chunksize:]
else:
dataframes.append(ray.put(df))
lengths.append(len(df))
dataframes.append(ray.put(temp_df))
lengths.append(len(temp_df))
ray_index = Index.from_pandas(old_index, lengths)
return DataFrame(dataframes, df.columns, index=ray_index)
return DataFrame(dataframes, df.columns, index=df.index)
def to_pandas(df):
-56
View File
@@ -1,56 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
class Index(object):
def __init__(self, idx, pandas_type):
self.idx = idx
self.pandas_type = pandas_type
def __getitem__(self, item):
return self.idx[item]
def __len__(self):
return len(self.idx)
@classmethod
def to_pandas(cls, index):
"""Convert a Ray Index object to a Pandas Index object.
Args:
index (ray.Index): A Ray Index object.
Returns:
A pandas Index object.
"""
k = index.idx.keys()
if index.pandas_type is pd.RangeIndex:
return pd.RangeIndex(min(k), max(k) + 1)
else:
return pd.Index(k)
@classmethod
def from_pandas(cls, pd_index, lengths):
"""Convert a Pandas Index object to a Ray Index object.
Args:
pd_index (pd.Index): A Pandas Index object.
lengths ([int]): A list of lengths for the partitions.
Returns:
A Ray Index object.
"""
dest_indices = [(i, j)
for i in range(len(lengths))
for j in range(lengths[i])]
if len(pd_index) != len(dest_indices):
raise ValueError(
"Length of index given does not match current dataframe")
return Index(
{pd_index[i]: dest_indices[i] for i in range(len(dest_indices))},
type(pd_index))
+60 -11
View File
@@ -194,6 +194,7 @@ def test_int_dataframe():
test___deepcopy__(ray_df, pandas_df)
test_bool(ray_df, pandas_df)
test_count(ray_df, pandas_df)
test_head(ray_df, pandas_df, 2)
test_head(ray_df, pandas_df)
test_tail(ray_df, pandas_df)
test_idxmax(ray_df, pandas_df)
@@ -224,7 +225,7 @@ def test_float_dataframe():
'col4': [12.0, 13.0, 14.0, 15.0],
'col5': [0.0, 0.0, 0.0, 0.0]})
ray_df = rdf.from_pandas(pandas_df, 2)
ray_df = rdf.from_pandas(pandas_df, 3)
testfuncs = [lambda x: x + 1,
lambda x: str(x),
@@ -266,6 +267,7 @@ def test_float_dataframe():
test___deepcopy__(ray_df, pandas_df)
test_bool(ray_df, pandas_df)
test_count(ray_df, pandas_df)
test_head(ray_df, pandas_df, 3)
test_head(ray_df, pandas_df)
test_tail(ray_df, pandas_df)
test_idxmax(ray_df, pandas_df)
@@ -322,25 +324,47 @@ def test_mixed_dtype_dataframe():
test_copy(ray_df)
test_sum(ray_df, pandas_df)
with pytest.raises(TypeError):
test_abs(ray_df, pandas_df)
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
test_bool(ray_df, pandas_df)
test_count(ray_df, pandas_df)
test_head(ray_df, pandas_df, 2)
test_head(ray_df, pandas_df)
test_tail(ray_df, pandas_df)
with pytest.raises(TypeError):
test_idxmax(ray_df, pandas_df)
with pytest.raises(TypeError):
test_idxmin(ray_df, pandas_df)
test_pop(ray_df, pandas_df)
test_max(ray_df, pandas_df)
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
for key in keys:
test_get(ray_df, pandas_df, key)
test_get_dtype_counts(ray_df, pandas_df)
test_get_ftype_counts(ray_df, pandas_df)
test_items(ray_df, pandas_df)
test_iterrows(ray_df, pandas_df)
test_items(ray_df, pandas_df)
test_iteritems(ray_df, pandas_df)
test_itertuples(ray_df, pandas_df)
test_max(ray_df, pandas_df)
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
def test_nan_dataframe():
pandas_df = pd.DataFrame({
@@ -377,14 +401,39 @@ def test_nan_dataframe():
test_copy(ray_df)
test_sum(ray_df, pandas_df)
test_abs(ray_df, pandas_df)
test_keys(ray_df, pandas_df)
test_transpose(ray_df, pandas_df)
test_round(ray_df, pandas_df)
test_all(ray_df, pandas_df)
test_any(ray_df, pandas_df)
test___getitem__(ray_df, pandas_df)
test___delitem__(ray_df, pandas_df)
test___copy__(ray_df, pandas_df)
test___deepcopy__(ray_df, pandas_df)
test_bool(ray_df, pandas_df)
test_count(ray_df, pandas_df)
test_head(ray_df, pandas_df, 2)
test_head(ray_df, pandas_df)
test_tail(ray_df, pandas_df)
test_idxmax(ray_df, pandas_df)
test_idxmin(ray_df, pandas_df)
test_pop(ray_df, pandas_df)
test_max(ray_df, pandas_df)
test_min(ray_df, pandas_df)
test_notna(ray_df, pandas_df)
test_notnull(ray_df, pandas_df)
for key in keys:
test_get(ray_df, pandas_df, key)
test_get_dtype_counts(ray_df, pandas_df)
test_get_ftype_counts(ray_df, pandas_df)
test_iterrows(ray_df, pandas_df)
test_items(ray_df, pandas_df)
test_iteritems(ray_df, pandas_df)
test_itertuples(ray_df, pandas_df)
def test_add():
@@ -816,8 +865,8 @@ def test_gt():
@pytest.fixture
def test_head(ray_df, pandas_df):
ray_df_equals_pandas(ray_df.head(), pandas_df.head())
def test_head(ray_df, pandas_df, n=5):
ray_df_equals_pandas(ray_df.head(n), pandas_df.head(n))
def test_hist():
@@ -906,7 +955,7 @@ def test_itertuples(ray_df, pandas_df):
ray_it_default = ray_df.itertuples()
pandas_it_default = pandas_df.itertuples()
for ray_row, pandas_row in zip(ray_it_default, pandas_it_default):
assert ray_row == pandas_row
np.testing.assert_equal(ray_row, pandas_row)
# test all combinations of custom params
indices = [True, False]
@@ -917,7 +966,7 @@ def test_itertuples(ray_df, pandas_df):
ray_it_custom = ray_df.itertuples(index=index, name=name)
pandas_it_custom = pandas_df.itertuples(index=index, name=name)
for ray_row, pandas_row in zip(ray_it_custom, pandas_it_custom):
assert ray_row == pandas_row
np.testing.assert_equal(ray_row, pandas_row)
def test_join():