[Dataframes] Reorganization (#1676)

* moved helper functions for dataframes into df_utils

* Updating base on review comments

* fixed bug with from_pandas

* Updating formatting

* Fix lint
This commit is contained in:
Jae Min Kim
2018-03-12 19:13:33 -07:00
committed by Devin Petersohn
parent 6455ec934b
commit 737120952e
4 changed files with 239 additions and 227 deletions
+1 -4
View File
@@ -28,14 +28,11 @@ def get_npartitions():
# We import these file after above two function
# because they depend on npartitions.
from .dataframe import DataFrame # noqa: 402
from .dataframe import from_pandas # noqa: 402
from .dataframe import to_pandas # noqa: 402
from .series import Series # noqa: 402
from .io import (read_csv, read_parquet) # noqa: 402
__all__ = [
"DataFrame", "from_pandas", "to_pandas", "Series", "read_csv",
"read_parquet"
"DataFrame", "Series", "read_csv", "read_parquet"
]
try:
+7 -151
View File
@@ -18,6 +18,13 @@ import warnings
import numpy as np
import ray
import itertools
from .utils import (
_get_lengths,
to_pandas,
_shuffle,
_local_groupby,
_deploy_func,
_compute_length_and_index)
class DataFrame(object):
@@ -2777,154 +2784,3 @@ class DataFrame(object):
"""
from .indexing import _iLoc_Indexer
return _iLoc_Indexer(self)
def _get_lengths(df):
"""Gets the length of the dataframe.
Args:
df: A remote pd.DataFrame object.
Returns:
Returns an integer length of the dataframe object. If the attempt
fails, returns 0 as the length.
"""
try:
return len(df)
# Because we sometimes have cases where we have summary statistics in our
# DataFrames
except TypeError:
return 0
@ray.remote
def _shuffle(df, indices, chunksize):
"""Shuffle data by sending it through the Ray Store.
Args:
df (pd.DataFrame): The pandas DataFrame to shuffle.
indices ([any]): The list of indices for the DataFrame.
chunksize (int): The number of indices to send.
Returns:
The list of pd.DataFrame objects in order of their assignment. This
order is important because it determines which task will get the data.
"""
i = 0
partition = []
while len(indices) > chunksize:
oids = df.reindex(indices[:chunksize])
partition.append(oids)
indices = indices[chunksize:]
i += 1
else:
oids = df.reindex(indices)
partition.append(oids)
return partition
@ray.remote
def _local_groupby(df_rows, axis=0):
"""Apply a groupby on this partition for the blocks sent to it.
Args:
df_rows ([pd.DataFrame]): A list of dataframes for this partition. Goes
through the Ray object store.
Returns:
A DataFrameGroupBy object from the resulting groupby.
"""
concat_df = pd.concat(df_rows, axis=axis)
return concat_df.groupby(concat_df.index)
@ray.remote
def _deploy_func(func, dataframe, *args):
"""Deploys a function for the _map_partitions call.
Args:
dataframe (pandas.DataFrame): The pandas DataFrame for this partition.
Returns:
A futures object representing the return value of the function
provided.
"""
if len(args) == 0:
return func(dataframe)
else:
return func(dataframe, *args)
def from_pandas(df, npartitions=None, chunksize=None, sort=True):
"""Converts a pandas DataFrame to a Ray DataFrame.
Args:
df (pandas.DataFrame): The pandas DataFrame to convert.
npartitions (int): The number of partitions to split the DataFrame
into. Has priority over chunksize.
chunksize (int): The number of rows to put in each partition.
sort (bool): Whether or not to sort the df as it is being converted.
Returns:
A new Ray DataFrame object.
"""
if npartitions is not None:
chunksize = int(len(df) / npartitions)
elif chunksize is None:
raise ValueError("The number of partitions or chunksize must be set.")
temp_df = df
dataframes = []
lengths = []
while len(temp_df) > chunksize:
t_df = temp_df[:chunksize]
lengths.append(len(t_df))
# reset_index here because we want a pd.RangeIndex
# within the partitions. It is smaller and sometimes faster.
t_df = t_df.reset_index(drop=True)
top = ray.put(t_df)
dataframes.append(top)
temp_df = temp_df[chunksize:]
else:
temp_df = temp_df.reset_index(drop=True)
dataframes.append(ray.put(temp_df))
lengths.append(len(temp_df))
return DataFrame(dataframes, df.columns, index=df.index)
def to_pandas(df):
"""Converts a Ray DataFrame to a pandas DataFrame/Series.
Args:
df (ray.DataFrame): The Ray DataFrame to convert.
Returns:
A new pandas DataFrame.
"""
pd_df = pd.concat(ray.get(df._df))
pd_df.index = df.index
pd_df.columns = df.columns
return pd_df
@ray.remote(num_return_vals=2)
def _compute_length_and_index(dfs):
"""Create a default index, which is a RangeIndex
Returns:
The pd.RangeIndex object that represents this DataFrame.
"""
lengths = ray.get([_deploy_func.remote(_get_lengths, d)
for d in dfs])
dest_indices = {"partition":
[i for i in range(len(lengths))
for j in range(lengths[i])],
"index_within_partition":
[j for i in range(len(lengths))
for j in range(lengths[i])]}
return lengths, pd.DataFrame(dest_indices)
+74 -72
View File
@@ -7,19 +7,23 @@ import numpy as np
import pandas as pd
import pandas.util.testing as tm
import ray.dataframe as rdf
from ray.dataframe.utils import (
to_pandas,
from_pandas
)
from pandas.tests.frame.common import TestData
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
@pytest.fixture
def ray_df_equals(ray_df1, ray_df2):
return rdf.to_pandas(ray_df1).sort_index().equals(
rdf.to_pandas(ray_df2).sort_index()
return to_pandas(ray_df1).sort_index().equals(
to_pandas(ray_df2).sort_index()
)
@@ -150,7 +154,7 @@ def create_test_dataframe():
'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
return rdf.from_pandas(df, 2)
return from_pandas(df, 2)
def test_int_dataframe():
@@ -160,7 +164,7 @@ def test_int_dataframe():
'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
ray_df = rdf.from_pandas(pandas_df, 2)
ray_df = from_pandas(pandas_df, 2)
testfuncs = [lambda x: x + 1,
lambda x: str(x),
@@ -270,7 +274,7 @@ def test_float_dataframe():
'col4': [12.0, 13.0, 14.0, 15.0],
'col5': [0.0, 0.0, 0.0, 0.0]})
ray_df = rdf.from_pandas(pandas_df, 3)
ray_df = from_pandas(pandas_df, 3)
testfuncs = [lambda x: x + 1,
lambda x: str(x),
@@ -379,7 +383,7 @@ def test_mixed_dtype_dataframe():
'col3': [8.0, 9.4, 10.1, 11.3],
'col4': ['a', 'b', 'c', 'd']})
ray_df = rdf.from_pandas(pandas_df, 2)
ray_df = from_pandas(pandas_df, 2)
testfuncs = [lambda x: x + x,
lambda x: str(x),
@@ -496,7 +500,7 @@ def test_nan_dataframe():
'col3': [8, np.nan, 10, 11],
'col4': [np.nan, 13, 14, 15]})
ray_df = rdf.from_pandas(pandas_df, 2)
ray_df = from_pandas(pandas_df, 2)
testfuncs = [lambda x: x + x,
lambda x: str(x),
@@ -711,7 +715,7 @@ def test_bfill(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
ray_df = from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.bfill(),
test_data.tsframe.bfill()
@@ -725,7 +729,7 @@ def test_bool(ray_df, pd_df):
pd_df.bool()
single_bool_pd_df = pd.DataFrame([True])
single_bool_ray_df = rdf.from_pandas(single_bool_pd_df, 1)
single_bool_ray_df = from_pandas(single_bool_pd_df, 1)
assert single_bool_pd_df.bool() == single_bool_ray_df.bool()
@@ -886,7 +890,7 @@ def test_dot():
def test_drop():
ray_df = create_test_dataframe()
simple = pd.DataFrame({"A": [1, 2, 3, 4], "B": [0, 1, 2, 3]})
ray_simple = rdf.from_pandas(simple, 2)
ray_simple = from_pandas(simple, 2)
assert ray_df_equals_pandas(ray_simple.drop("A", axis=1), simple[['B']])
assert ray_df_equals_pandas(ray_simple.drop(["A", "B"], axis='columns'),
simple[[]])
@@ -913,7 +917,7 @@ def test_drop():
# non-unique - wheee!
nu_df = pd.DataFrame(pd.compat.lzip(range(3), range(-3, 1), list('abc')),
columns=['a', 'a', 'b'])
ray_nu_df = rdf.from_pandas(nu_df, 3)
ray_nu_df = from_pandas(nu_df, 3)
assert ray_df_equals_pandas(ray_nu_df.drop('a', axis=1), nu_df[['b']])
assert ray_df_equals_pandas(ray_nu_df.drop('b', axis='columns'),
nu_df['a'])
@@ -921,7 +925,7 @@ def test_drop():
nu_df = nu_df.set_index(pd.Index(['X', 'Y', 'X']))
nu_df.columns = list('abc')
ray_nu_df = rdf.from_pandas(nu_df, 3)
ray_nu_df = from_pandas(nu_df, 3)
assert ray_df_equals_pandas(ray_nu_df.drop('X', axis='rows'),
nu_df.loc[["Y"], :])
assert ray_df_equals_pandas(ray_nu_df.drop(['X', 'Y'], axis=0),
@@ -930,7 +934,7 @@ def test_drop():
# inplace cache issue
# GH 5628
df = pd.DataFrame(np.random.randn(10, 3), columns=list('abc'))
ray_df = rdf.from_pandas(df, 2)
ray_df = from_pandas(df, 2)
expected = df[~(df.b > 0)]
ray_df.drop(labels=df[df.b > 0].index, inplace=True)
assert ray_df_equals_pandas(ray_df, expected)
@@ -941,7 +945,7 @@ def test_drop_api_equivalence():
df = pd.DataFrame([[1, 2, 3], [3, 4, 5], [5, 6, 7]],
index=['a', 'b', 'c'],
columns=['d', 'e', 'f'])
ray_df = rdf.from_pandas(df, 3)
ray_df = from_pandas(df, 3)
res1 = ray_df.drop('a')
res2 = ray_df.drop(index='a')
@@ -997,14 +1001,14 @@ def test_eq():
def test_equals():
pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3],
'col2': [2, 3, 4, 1]})
ray_df1 = rdf.from_pandas(pandas_df1, 2)
ray_df2 = rdf.from_pandas(pandas_df1, 3)
ray_df1 = from_pandas(pandas_df1, 2)
ray_df2 = from_pandas(pandas_df1, 3)
assert ray_df1.equals(ray_df2)
pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3],
'col2': [2, 3, 5, 1]})
ray_df3 = rdf.from_pandas(pandas_df2, 4)
ray_df3 = from_pandas(pandas_df2, 4)
assert not ray_df3.equals(ray_df1)
assert not ray_df3.equals(ray_df2)
@@ -1013,7 +1017,7 @@ def test_equals():
def test_eval_df_use_case():
df = pd.DataFrame({'a': np.random.randn(10),
'b': np.random.randn(10)})
ray_df = rdf.from_pandas(df, 5)
ray_df = from_pandas(df, 5)
df.eval("e = arctan2(sin(a), b)",
engine='python',
parser='pandas', inplace=True)
@@ -1029,7 +1033,7 @@ def test_eval_df_use_case():
def test_eval_df_arithmetic_subexpression():
df = pd.DataFrame({'a': np.random.randn(10),
'b': np.random.randn(10)})
ray_df = rdf.from_pandas(df, 5)
ray_df = from_pandas(df, 5)
df.eval("e = sin(a + b)",
engine='python',
parser='pandas', inplace=True)
@@ -1061,7 +1065,7 @@ def test_ffill(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
ray_df = from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.ffill(),
test_data.tsframe.ffill()
@@ -1096,12 +1100,12 @@ def test_fillna_sanity(num_partitions=2):
tf.loc[tf.index[-5:], 'A'] = np.nan
zero_filled = test_data.tsframe.fillna(0)
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions).fillna(0)
ray_df = from_pandas(test_data.tsframe, num_partitions).fillna(0)
assert ray_df_equals_pandas(ray_df, zero_filled)
padded = test_data.tsframe.fillna(method='pad')
ray_df = rdf.from_pandas(test_data.tsframe,
num_partitions).fillna(method='pad')
ray_df = from_pandas(test_data.tsframe,
num_partitions).fillna(method='pad')
assert ray_df_equals_pandas(ray_df, padded)
# mixed type
@@ -1110,20 +1114,20 @@ def test_fillna_sanity(num_partitions=2):
mf.loc[mf.index[-10:], 'A'] = np.nan
result = test_data.mixed_frame.fillna(value=0)
ray_df = rdf.from_pandas(test_data.mixed_frame,
num_partitions).fillna(value=0)
ray_df = from_pandas(test_data.mixed_frame,
num_partitions).fillna(value=0)
assert ray_df_equals_pandas(ray_df, result)
result = test_data.mixed_frame.fillna(method='pad')
ray_df = rdf.from_pandas(test_data.mixed_frame,
num_partitions).fillna(method='pad')
ray_df = from_pandas(test_data.mixed_frame,
num_partitions).fillna(method='pad')
assert ray_df_equals_pandas(ray_df, result)
pytest.raises(ValueError, test_data.tsframe.fillna)
pytest.raises(ValueError, rdf.from_pandas(test_data.tsframe,
num_partitions).fillna)
pytest.raises(ValueError, from_pandas(test_data.tsframe,
num_partitions).fillna)
with pytest.raises(ValueError):
rdf.from_pandas(test_data.tsframe, num_partitions).fillna(
from_pandas(test_data.tsframe, num_partitions).fillna(
5, method='ffill'
)
@@ -1131,11 +1135,11 @@ def test_fillna_sanity(num_partitions=2):
mf = test_data.mixed_float.reindex(columns=['A', 'B', 'D'])
mf.loc[mf.index[-10:], 'A'] = np.nan
result = mf.fillna(value=0)
ray_df = rdf.from_pandas(mf, num_partitions).fillna(value=0)
ray_df = from_pandas(mf, num_partitions).fillna(value=0)
assert ray_df_equals_pandas(ray_df, result)
result = mf.fillna(method='pad')
ray_df = rdf.from_pandas(mf, num_partitions).fillna(method='pad')
ray_df = from_pandas(mf, num_partitions).fillna(method='pad')
assert ray_df_equals_pandas(ray_df, result)
# TODO: Use this when Arrow issue resolves:
@@ -1151,10 +1155,10 @@ def test_fillna_sanity(num_partitions=2):
'b', 'b', np.nan, 'b'], ['c', 'c', np.nan, 'c']])
result = df.fillna({2: 'foo'})
ray_df = rdf.from_pandas(df, num_partitions).fillna({2: 'foo'})
ray_df = from_pandas(df, num_partitions).fillna({2: 'foo'})
assert ray_df_equals_pandas(ray_df, result)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
df.fillna({2: 'foo'}, inplace=True)
ray_df.fillna({2: 'foo'}, inplace=True)
assert ray_df_equals_pandas(ray_df, result)
@@ -1165,7 +1169,7 @@ def test_fillna_sanity(num_partitions=2):
df.iloc[3:5, 2] = np.nan
# result = df.fillna(999, limit=1)
# ray_df = rdf.from_pandas(df, num_partitions).fillna(999, limit=1)
# ray_df = from_pandas(df, num_partitions).fillna(999, limit=1)
# assert ray_df_equals_pandas(ray_df, result)
@@ -1176,7 +1180,7 @@ def test_fillna_sanity(num_partitions=2):
'Date2': [pd.Timestamp("2013-1-1"), pd.NaT]
})
result = df.fillna(value={'Date': df['Date2']})
ray_df = rdf.from_pandas(df, num_partitions).fillna(
ray_df = from_pandas(df, num_partitions).fillna(
value={'Date': df['Date2']}
)
assert ray_df_equals_pandas(ray_df, result)
@@ -1188,13 +1192,13 @@ def test_fillna_sanity(num_partitions=2):
"""
df = pd.DataFrame({'A': [pd.Timestamp('2012-11-11 00:00:00+01:00'),
pd.NaT]})
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(ray_df.fillna(method='pad'),
df.fillna(method='pad'))
df = pd.DataFrame({'A': [pd.NaT,
pd.Timestamp('2012-11-11 00:00:00+01:00')]})
ray_df = rdf.from_pandas(df, num_partitions).fillna(method='bfill')
ray_df = from_pandas(df, num_partitions).fillna(method='bfill')
assert ray_df_equals_pandas(ray_df, df.fillna(method='bfill'))
"""
@@ -1205,13 +1209,13 @@ def test_fillna_downcast(num_partitions=2):
# infer int64 from float64
df = pd.DataFrame({'a': [1., np.nan]})
result = df.fillna(0, downcast='infer')
ray_df = rdf.from_pandas(df, num_partitions).fillna(0, downcast='infer')
ray_df = from_pandas(df, num_partitions).fillna(0, downcast='infer')
assert ray_df_equals_pandas(ray_df, result)
# infer int64 from float64 when fillna value is a dict
df = pd.DataFrame({'a': [1., np.nan]})
result = df.fillna({'a': 0}, downcast='infer')
ray_df = rdf.from_pandas(df, num_partitions).fillna(
ray_df = from_pandas(df, num_partitions).fillna(
{'a': 0}, downcast='infer'
)
assert ray_df_equals_pandas(ray_df, result)
@@ -1222,7 +1226,7 @@ def test_ffill2(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
ray_df = from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill'),
test_data.tsframe.fillna(method='ffill')
@@ -1234,7 +1238,7 @@ def test_bfill2(num_partitions=2):
test_data = TestData()
test_data.tsframe['A'][:5] = np.nan
test_data.tsframe['A'][-5:] = np.nan
ray_df = rdf.from_pandas(test_data.tsframe, num_partitions)
ray_df = from_pandas(test_data.tsframe, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='bfill'),
test_data.tsframe.fillna(method='bfill')
@@ -1247,20 +1251,20 @@ def test_fillna_inplace(num_partitions=2):
df[1][:4] = np.nan
df[3][-4:] = np.nan
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
df.fillna(value=0, inplace=True)
assert not ray_df_equals_pandas(ray_df, df)
ray_df.fillna(value=0, inplace=True)
assert ray_df_equals_pandas(ray_df, df)
ray_df = rdf.from_pandas(df, num_partitions).fillna(value={0: 0},
inplace=True)
ray_df = from_pandas(df, num_partitions).fillna(value={0: 0},
inplace=True)
assert ray_df is None
df[1][:4] = np.nan
df[3][-4:] = np.nan
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
df.fillna(method='ffill', inplace=True)
assert not ray_df_equals_pandas(ray_df, df)
@@ -1277,14 +1281,14 @@ def test_frame_fillna_limit(num_partitions=2):
expected = df[:2].reindex(index)
expected = expected.fillna(method='pad', limit=5)
ray_df = rdf.from_pandas(df[:2].reindex(index), num_partitions).fillna(
ray_df = from_pandas(df[:2].reindex(index), num_partitions).fillna(
method='pad', limit=5
)
assert ray_df_equals_pandas(ray_df, expected)
expected = df[-2:].reindex(index)
expected = expected.fillna(method='backfill', limit=5)
ray_df = rdf.from_pandas(df[-2:].reindex(index), num_partitions).fillna(
ray_df = from_pandas(df[-2:].reindex(index), num_partitions).fillna(
method='backfill', limit=5
)
assert ray_df_equals_pandas(ray_df, expected)
@@ -1296,14 +1300,14 @@ def test_frame_pad_backfill_limit(num_partitions=2):
df = pd.DataFrame(np.random.randn(10, 4), index=index)
result = df[:2].reindex(index)
ray_df = rdf.from_pandas(result, num_partitions)
ray_df = from_pandas(result, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='pad', limit=5),
result.fillna(method='pad', limit=5)
)
result = df[-2:].reindex(index)
ray_df = rdf.from_pandas(result, num_partitions)
ray_df = from_pandas(result, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='backfill', limit=5),
result.fillna(method='backfill', limit=5)
@@ -1317,7 +1321,7 @@ def test_fillna_dtype_conversion(num_partitions=2):
# empty block
df = pd.DataFrame(index=range(3), columns=['A', 'B'], dtype='float64')
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna('nan'),
df.fillna('nan')
@@ -1325,7 +1329,7 @@ def test_fillna_dtype_conversion(num_partitions=2):
# equiv of replace
df = pd.DataFrame(dict(A=[1, np.nan], B=[1., 2.]))
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
for v in ['', 1, np.nan, 1.0]:
assert ray_df_equals_pandas(
ray_df.fillna(v),
@@ -1338,7 +1342,7 @@ def test_fillna_skip_certain_blocks(num_partitions=2):
# don't try to fill boolean, int blocks
df = pd.DataFrame(np.random.randn(10, 4).astype(int))
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
# it works!
assert ray_df_equals_pandas(
@@ -1352,7 +1356,7 @@ def test_fillna_dict_series(num_partitions=2):
df = pd.DataFrame({'a': [np.nan, 1, 2, np.nan, np.nan],
'b': [1, 2, 3, np.nan, np.nan],
'c': [np.nan, 1, 2, 3, 4]})
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna({'a': 0, 'b': 5}),
@@ -1379,7 +1383,7 @@ def test_fillna_dataframe(num_partitions=2):
'b': [1, 2, 3, np.nan, np.nan],
'c': [np.nan, 1, 2, 3, 4]},
index=list('VWXYZ'))
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
# df2 may have different index and columns
df2 = pd.DataFrame({'a': [np.nan, 10, 20, 30, 40],
@@ -1398,7 +1402,7 @@ def test_fillna_dataframe(num_partitions=2):
def test_fillna_columns(num_partitions=2):
df = pd.DataFrame(np.random.randn(10, 10))
df.values[:, ::2] = np.nan
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill', axis=1),
@@ -1406,7 +1410,7 @@ def test_fillna_columns(num_partitions=2):
)
df.insert(6, 'foo', 5)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill', axis=1),
df.fillna(method='ffill', axis=1)
@@ -1416,7 +1420,7 @@ def test_fillna_columns(num_partitions=2):
@pytest.fixture
def test_fillna_invalid_method(num_partitions=2):
test_data = TestData()
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
ray_df = from_pandas(test_data.frame, num_partitions)
with tm.assert_raises_regex(ValueError, 'ffil'):
ray_df.fillna(method='ffil')
@@ -1424,7 +1428,7 @@ def test_fillna_invalid_method(num_partitions=2):
@pytest.fixture
def test_fillna_invalid_value(num_partitions=2):
test_data = TestData()
ray_df = rdf.from_pandas(test_data.frame, num_partitions)
ray_df = from_pandas(test_data.frame, num_partitions)
# list
pytest.raises(TypeError, ray_df.fillna, [1, 2])
# tuple
@@ -1439,7 +1443,7 @@ def test_fillna_col_reordering(num_partitions=2):
cols = ["COL." + str(i) for i in range(5, 0, -1)]
data = np.random.rand(20, 5)
df = pd.DataFrame(index=range(20), columns=cols, data=data)
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna(method='ffill'),
df.fillna(method='ffill')
@@ -1457,7 +1461,7 @@ def test_fillna_datetime_columns(num_partitions=2):
'C': ['foo', 'bar', None],
'D': ['foo2', 'bar2', None]},
index=date_range('20130110', periods=3))
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna('?'),
df.fillna('?')
@@ -1469,7 +1473,7 @@ def test_fillna_datetime_columns(num_partitions=2):
'C': ['foo', 'bar', None],
'D': ['foo2', 'bar2', None]},
index=date_range('20130110', periods=3))
ray_df = rdf.from_pandas(df, num_partitions)
ray_df = from_pandas(df, num_partitions)
assert ray_df_equals_pandas(
ray_df.fillna('?'),
df.fillna('?')
@@ -1915,7 +1919,7 @@ def test_query(ray_df, pandas_df, funcs):
for f in funcs:
pandas_df_new, ray_df_new = pandas_df.query(f), ray_df.query(f)
assert pandas_df_new.equals(rdf.to_pandas(ray_df_new))
assert pandas_df_new.equals(to_pandas(ray_df_new))
def test_radd():
@@ -2237,16 +2241,14 @@ def test_resample():
@pytest.fixture
def test_reset_index(ray_df, pandas_df, inplace=False):
if not inplace:
print(rdf.to_pandas(ray_df.reset_index(inplace=inplace)).index)
print(pandas_df.reset_index(inplace=inplace))
assert rdf.to_pandas(ray_df.reset_index(inplace=inplace)).equals(
assert to_pandas(ray_df.reset_index(inplace=inplace)).equals(
pandas_df.reset_index(inplace=inplace))
else:
ray_df_cp = ray_df.copy()
pd_df_cp = pandas_df.copy()
ray_df_cp.reset_index(inplace=inplace)
pd_df_cp.reset_index(inplace=inplace)
assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp)
assert to_pandas(ray_df_cp).equals(pd_df_cp)
def test_rfloordiv():
@@ -2334,21 +2336,21 @@ def test_sem():
@pytest.fixture
def test_set_axis(ray_df, pandas_df, label, axis):
assert rdf.to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals(
assert to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals(
pandas_df.set_axis(label, axis, inplace=False))
@pytest.fixture
def test_set_index(ray_df, pandas_df, keys, inplace=False):
if not inplace:
assert rdf.to_pandas(ray_df.set_index(keys)).equals(
assert to_pandas(ray_df.set_index(keys)).equals(
pandas_df.set_index(keys))
else:
ray_df_cp = ray_df.copy()
pd_df_cp = pandas_df.copy()
ray_df_cp.set_index(keys, inplace=inplace)
pd_df_cp.set_index(keys, inplace=inplace)
assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp)
assert to_pandas(ray_df_cp).equals(pd_df_cp)
def test_set_value():
@@ -2730,7 +2732,7 @@ def test___unicode__():
@pytest.fixture
def test___neg__(ray_df, pd_df):
ray_df_neg = ray_df.__neg__()
assert pd_df.__neg__().equals(rdf.to_pandas(ray_df_neg))
assert pd_df.__neg__().equals(to_pandas(ray_df_neg))
def test___invert__():
+157
View File
@@ -0,0 +1,157 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas as pd
import ray
def _get_lengths(df):
"""Gets the length of the dataframe.
Args:
df: A remote pd.DataFrame object.
Returns:
Returns an integer length of the dataframe object. If the attempt
fails, returns 0 as the length.
"""
try:
return len(df)
# Because we sometimes have cases where we have summary statistics in our
# DataFrames
except TypeError:
return 0
def from_pandas(df, npartitions=None, chunksize=None):
"""Converts a pandas DataFrame to a Ray DataFrame.
Args:
df (pandas.DataFrame): The pandas DataFrame to convert.
npartitions (int): The number of partitions to split the DataFrame
into. Has priority over chunksize.
chunksize (int): The number of rows to put in each partition.
Returns:
A new Ray DataFrame object.
"""
from .dataframe import DataFrame
if npartitions is not None:
chunksize = int(len(df) / npartitions)
elif chunksize is None:
raise ValueError("The number of partitions or chunksize must be set.")
temp_df = df
dataframes = []
lengths = []
while len(temp_df) > chunksize:
t_df = temp_df[:chunksize]
lengths.append(len(t_df))
# reset_index here because we want a pd.RangeIndex
# within the partitions. It is smaller and sometimes faster.
t_df = t_df.reset_index(drop=True)
top = ray.put(t_df)
dataframes.append(top)
temp_df = temp_df[chunksize:]
else:
temp_df = temp_df.reset_index(drop=True)
dataframes.append(ray.put(temp_df))
lengths.append(len(temp_df))
return DataFrame(dataframes, df.columns, index=df.index)
def to_pandas(df):
"""Converts a Ray DataFrame to a pandas DataFrame/Series.
Args:
df (ray.DataFrame): The Ray DataFrame to convert.
Returns:
A new pandas DataFrame.
"""
pd_df = pd.concat(ray.get(df._df))
pd_df.index = df.index
pd_df.columns = df.columns
return pd_df
@ray.remote
def _shuffle(df, indices, chunksize):
"""Shuffle data by sending it through the Ray Store.
Args:
df (pd.DataFrame): The pandas DataFrame to shuffle.
indices ([any]): The list of indices for the DataFrame.
chunksize (int): The number of indices to send.
Returns:
The list of pd.DataFrame objects in order of their assignment. This
order is important because it determines which task will get the data.
"""
i = 0
partition = []
while len(indices) > chunksize:
oids = df.reindex(indices[:chunksize])
partition.append(oids)
indices = indices[chunksize:]
i += 1
else:
oids = df.reindex(indices)
partition.append(oids)
return partition
@ray.remote
def _local_groupby(df_rows, axis=0):
"""Apply a groupby on this partition for the blocks sent to it.
Args:
df_rows ([pd.DataFrame]): A list of dataframes for this partition. Goes
through the Ray object store.
Returns:
A DataFrameGroupBy object from the resulting groupby.
"""
concat_df = pd.concat(df_rows, axis=axis)
return concat_df.groupby(concat_df.index)
@ray.remote
def _deploy_func(func, dataframe, *args):
"""Deploys a function for the _map_partitions call.
Args:
dataframe (pandas.DataFrame): The pandas DataFrame for this partition.
Returns:
A futures object representing the return value of the function
provided.
"""
if len(args) == 0:
return func(dataframe)
else:
return func(dataframe, *args)
@ray.remote(num_return_vals=2)
def _compute_length_and_index(dfs):
"""Create a default index, which is a RangeIndex
Returns:
The pd.RangeIndex object that represents this DataFrame.
"""
lengths = ray.get([_deploy_func.remote(_get_lengths, d)
for d in dfs])
dest_indices = {"partition":
[i for i in range(len(lengths))
for j in range(lengths[i])],
"index_within_partition":
[j for i in range(len(lengths))
for j in range(lengths[i])]}
return lengths, pd.DataFrame(dest_indices)