From 737120952e41ad59ec788d79bd9c6f815a2c3b68 Mon Sep 17 00:00:00 2001 From: Jae Min Kim Date: Mon, 12 Mar 2018 19:13:33 -0700 Subject: [PATCH] [Dataframes] Reorganization (#1676) * moved helper functions for dataframes into df_utils * Updating base on review comments * fixed bug with from_pandas * Updating formatting * Fix lint --- python/ray/dataframe/__init__.py | 5 +- python/ray/dataframe/dataframe.py | 158 +------------------- python/ray/dataframe/test/test_dataframe.py | 146 +++++++++--------- python/ray/dataframe/utils.py | 157 +++++++++++++++++++ 4 files changed, 239 insertions(+), 227 deletions(-) create mode 100644 python/ray/dataframe/utils.py diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 824ef367a..b23a268aa 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -28,14 +28,11 @@ def get_npartitions(): # We import these file after above two function # because they depend on npartitions. from .dataframe import DataFrame # noqa: 402 -from .dataframe import from_pandas # noqa: 402 -from .dataframe import to_pandas # noqa: 402 from .series import Series # noqa: 402 from .io import (read_csv, read_parquet) # noqa: 402 __all__ = [ - "DataFrame", "from_pandas", "to_pandas", "Series", "read_csv", - "read_parquet" + "DataFrame", "Series", "read_csv", "read_parquet" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a0de0db01..dd4dd11eb 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -18,6 +18,13 @@ import warnings import numpy as np import ray import itertools +from .utils import ( + _get_lengths, + to_pandas, + _shuffle, + _local_groupby, + _deploy_func, + _compute_length_and_index) class DataFrame(object): @@ -2777,154 +2784,3 @@ class DataFrame(object): """ from .indexing import _iLoc_Indexer return _iLoc_Indexer(self) - - -def _get_lengths(df): - """Gets the length of the dataframe. - - Args: - df: A remote pd.DataFrame object. - - Returns: - Returns an integer length of the dataframe object. If the attempt - fails, returns 0 as the length. - """ - try: - return len(df) - # Because we sometimes have cases where we have summary statistics in our - # DataFrames - except TypeError: - return 0 - - -@ray.remote -def _shuffle(df, indices, chunksize): - """Shuffle data by sending it through the Ray Store. - - Args: - df (pd.DataFrame): The pandas DataFrame to shuffle. - indices ([any]): The list of indices for the DataFrame. - chunksize (int): The number of indices to send. - - Returns: - The list of pd.DataFrame objects in order of their assignment. This - order is important because it determines which task will get the data. - """ - i = 0 - partition = [] - while len(indices) > chunksize: - oids = df.reindex(indices[:chunksize]) - partition.append(oids) - indices = indices[chunksize:] - i += 1 - else: - oids = df.reindex(indices) - partition.append(oids) - return partition - - -@ray.remote -def _local_groupby(df_rows, axis=0): - """Apply a groupby on this partition for the blocks sent to it. - - Args: - df_rows ([pd.DataFrame]): A list of dataframes for this partition. Goes - through the Ray object store. - - Returns: - A DataFrameGroupBy object from the resulting groupby. - """ - concat_df = pd.concat(df_rows, axis=axis) - return concat_df.groupby(concat_df.index) - - -@ray.remote -def _deploy_func(func, dataframe, *args): - """Deploys a function for the _map_partitions call. - - Args: - dataframe (pandas.DataFrame): The pandas DataFrame for this partition. - - Returns: - A futures object representing the return value of the function - provided. - """ - if len(args) == 0: - return func(dataframe) - else: - return func(dataframe, *args) - - -def from_pandas(df, npartitions=None, chunksize=None, sort=True): - """Converts a pandas DataFrame to a Ray DataFrame. - - Args: - df (pandas.DataFrame): The pandas DataFrame to convert. - npartitions (int): The number of partitions to split the DataFrame - into. Has priority over chunksize. - chunksize (int): The number of rows to put in each partition. - sort (bool): Whether or not to sort the df as it is being converted. - - Returns: - A new Ray DataFrame object. - """ - - if npartitions is not None: - chunksize = int(len(df) / npartitions) - elif chunksize is None: - raise ValueError("The number of partitions or chunksize must be set.") - - temp_df = df - - dataframes = [] - lengths = [] - while len(temp_df) > chunksize: - t_df = temp_df[:chunksize] - lengths.append(len(t_df)) - # reset_index here because we want a pd.RangeIndex - # within the partitions. It is smaller and sometimes faster. - t_df = t_df.reset_index(drop=True) - top = ray.put(t_df) - dataframes.append(top) - temp_df = temp_df[chunksize:] - else: - temp_df = temp_df.reset_index(drop=True) - dataframes.append(ray.put(temp_df)) - lengths.append(len(temp_df)) - - return DataFrame(dataframes, df.columns, index=df.index) - - -def to_pandas(df): - """Converts a Ray DataFrame to a pandas DataFrame/Series. - - Args: - df (ray.DataFrame): The Ray DataFrame to convert. - - Returns: - A new pandas DataFrame. - """ - pd_df = pd.concat(ray.get(df._df)) - pd_df.index = df.index - pd_df.columns = df.columns - return pd_df - - -@ray.remote(num_return_vals=2) -def _compute_length_and_index(dfs): - """Create a default index, which is a RangeIndex - - Returns: - The pd.RangeIndex object that represents this DataFrame. - """ - lengths = ray.get([_deploy_func.remote(_get_lengths, d) - for d in dfs]) - - dest_indices = {"partition": - [i for i in range(len(lengths)) - for j in range(lengths[i])], - "index_within_partition": - [j for i in range(len(lengths)) - for j in range(lengths[i])]} - - return lengths, pd.DataFrame(dest_indices) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index ec6041b60..80165eae9 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -7,19 +7,23 @@ import numpy as np import pandas as pd import pandas.util.testing as tm import ray.dataframe as rdf +from ray.dataframe.utils import ( + to_pandas, + from_pandas +) from pandas.tests.frame.common import TestData @pytest.fixture def ray_df_equals_pandas(ray_df, pandas_df): - return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) @pytest.fixture def ray_df_equals(ray_df1, ray_df2): - return rdf.to_pandas(ray_df1).sort_index().equals( - rdf.to_pandas(ray_df2).sort_index() + return to_pandas(ray_df1).sort_index().equals( + to_pandas(ray_df2).sort_index() ) @@ -150,7 +154,7 @@ def create_test_dataframe(): 'col4': [12, 13, 14, 15], 'col5': [0, 0, 0, 0]}) - return rdf.from_pandas(df, 2) + return from_pandas(df, 2) def test_int_dataframe(): @@ -160,7 +164,7 @@ def test_int_dataframe(): 'col3': [8, 9, 10, 11], 'col4': [12, 13, 14, 15], 'col5': [0, 0, 0, 0]}) - ray_df = rdf.from_pandas(pandas_df, 2) + ray_df = from_pandas(pandas_df, 2) testfuncs = [lambda x: x + 1, lambda x: str(x), @@ -270,7 +274,7 @@ def test_float_dataframe(): 'col4': [12.0, 13.0, 14.0, 15.0], 'col5': [0.0, 0.0, 0.0, 0.0]}) - ray_df = rdf.from_pandas(pandas_df, 3) + ray_df = from_pandas(pandas_df, 3) testfuncs = [lambda x: x + 1, lambda x: str(x), @@ -379,7 +383,7 @@ def test_mixed_dtype_dataframe(): 'col3': [8.0, 9.4, 10.1, 11.3], 'col4': ['a', 'b', 'c', 'd']}) - ray_df = rdf.from_pandas(pandas_df, 2) + ray_df = from_pandas(pandas_df, 2) testfuncs = [lambda x: x + x, lambda x: str(x), @@ -496,7 +500,7 @@ def test_nan_dataframe(): 'col3': [8, np.nan, 10, 11], 'col4': [np.nan, 13, 14, 15]}) - ray_df = rdf.from_pandas(pandas_df, 2) + ray_df = from_pandas(pandas_df, 2) testfuncs = [lambda x: x + x, lambda x: str(x), @@ -711,7 +715,7 @@ def test_bfill(num_partitions=2): test_data = TestData() test_data.tsframe['A'][:5] = np.nan test_data.tsframe['A'][-5:] = np.nan - ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + ray_df = from_pandas(test_data.tsframe, num_partitions) assert ray_df_equals_pandas( ray_df.bfill(), test_data.tsframe.bfill() @@ -725,7 +729,7 @@ def test_bool(ray_df, pd_df): pd_df.bool() single_bool_pd_df = pd.DataFrame([True]) - single_bool_ray_df = rdf.from_pandas(single_bool_pd_df, 1) + single_bool_ray_df = from_pandas(single_bool_pd_df, 1) assert single_bool_pd_df.bool() == single_bool_ray_df.bool() @@ -886,7 +890,7 @@ def test_dot(): def test_drop(): ray_df = create_test_dataframe() simple = pd.DataFrame({"A": [1, 2, 3, 4], "B": [0, 1, 2, 3]}) - ray_simple = rdf.from_pandas(simple, 2) + ray_simple = from_pandas(simple, 2) assert ray_df_equals_pandas(ray_simple.drop("A", axis=1), simple[['B']]) assert ray_df_equals_pandas(ray_simple.drop(["A", "B"], axis='columns'), simple[[]]) @@ -913,7 +917,7 @@ def test_drop(): # non-unique - wheee! nu_df = pd.DataFrame(pd.compat.lzip(range(3), range(-3, 1), list('abc')), columns=['a', 'a', 'b']) - ray_nu_df = rdf.from_pandas(nu_df, 3) + ray_nu_df = from_pandas(nu_df, 3) assert ray_df_equals_pandas(ray_nu_df.drop('a', axis=1), nu_df[['b']]) assert ray_df_equals_pandas(ray_nu_df.drop('b', axis='columns'), nu_df['a']) @@ -921,7 +925,7 @@ def test_drop(): nu_df = nu_df.set_index(pd.Index(['X', 'Y', 'X'])) nu_df.columns = list('abc') - ray_nu_df = rdf.from_pandas(nu_df, 3) + ray_nu_df = from_pandas(nu_df, 3) assert ray_df_equals_pandas(ray_nu_df.drop('X', axis='rows'), nu_df.loc[["Y"], :]) assert ray_df_equals_pandas(ray_nu_df.drop(['X', 'Y'], axis=0), @@ -930,7 +934,7 @@ def test_drop(): # inplace cache issue # GH 5628 df = pd.DataFrame(np.random.randn(10, 3), columns=list('abc')) - ray_df = rdf.from_pandas(df, 2) + ray_df = from_pandas(df, 2) expected = df[~(df.b > 0)] ray_df.drop(labels=df[df.b > 0].index, inplace=True) assert ray_df_equals_pandas(ray_df, expected) @@ -941,7 +945,7 @@ def test_drop_api_equivalence(): df = pd.DataFrame([[1, 2, 3], [3, 4, 5], [5, 6, 7]], index=['a', 'b', 'c'], columns=['d', 'e', 'f']) - ray_df = rdf.from_pandas(df, 3) + ray_df = from_pandas(df, 3) res1 = ray_df.drop('a') res2 = ray_df.drop(index='a') @@ -997,14 +1001,14 @@ def test_eq(): def test_equals(): pandas_df1 = pd.DataFrame({'col1': [2.9, 3, 3, 3], 'col2': [2, 3, 4, 1]}) - ray_df1 = rdf.from_pandas(pandas_df1, 2) - ray_df2 = rdf.from_pandas(pandas_df1, 3) + ray_df1 = from_pandas(pandas_df1, 2) + ray_df2 = from_pandas(pandas_df1, 3) assert ray_df1.equals(ray_df2) pandas_df2 = pd.DataFrame({'col1': [2.9, 3, 3, 3], 'col2': [2, 3, 5, 1]}) - ray_df3 = rdf.from_pandas(pandas_df2, 4) + ray_df3 = from_pandas(pandas_df2, 4) assert not ray_df3.equals(ray_df1) assert not ray_df3.equals(ray_df2) @@ -1013,7 +1017,7 @@ def test_equals(): def test_eval_df_use_case(): df = pd.DataFrame({'a': np.random.randn(10), 'b': np.random.randn(10)}) - ray_df = rdf.from_pandas(df, 5) + ray_df = from_pandas(df, 5) df.eval("e = arctan2(sin(a), b)", engine='python', parser='pandas', inplace=True) @@ -1029,7 +1033,7 @@ def test_eval_df_use_case(): def test_eval_df_arithmetic_subexpression(): df = pd.DataFrame({'a': np.random.randn(10), 'b': np.random.randn(10)}) - ray_df = rdf.from_pandas(df, 5) + ray_df = from_pandas(df, 5) df.eval("e = sin(a + b)", engine='python', parser='pandas', inplace=True) @@ -1061,7 +1065,7 @@ def test_ffill(num_partitions=2): test_data = TestData() test_data.tsframe['A'][:5] = np.nan test_data.tsframe['A'][-5:] = np.nan - ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + ray_df = from_pandas(test_data.tsframe, num_partitions) assert ray_df_equals_pandas( ray_df.ffill(), test_data.tsframe.ffill() @@ -1096,12 +1100,12 @@ def test_fillna_sanity(num_partitions=2): tf.loc[tf.index[-5:], 'A'] = np.nan zero_filled = test_data.tsframe.fillna(0) - ray_df = rdf.from_pandas(test_data.tsframe, num_partitions).fillna(0) + ray_df = from_pandas(test_data.tsframe, num_partitions).fillna(0) assert ray_df_equals_pandas(ray_df, zero_filled) padded = test_data.tsframe.fillna(method='pad') - ray_df = rdf.from_pandas(test_data.tsframe, - num_partitions).fillna(method='pad') + ray_df = from_pandas(test_data.tsframe, + num_partitions).fillna(method='pad') assert ray_df_equals_pandas(ray_df, padded) # mixed type @@ -1110,20 +1114,20 @@ def test_fillna_sanity(num_partitions=2): mf.loc[mf.index[-10:], 'A'] = np.nan result = test_data.mixed_frame.fillna(value=0) - ray_df = rdf.from_pandas(test_data.mixed_frame, - num_partitions).fillna(value=0) + ray_df = from_pandas(test_data.mixed_frame, + num_partitions).fillna(value=0) assert ray_df_equals_pandas(ray_df, result) result = test_data.mixed_frame.fillna(method='pad') - ray_df = rdf.from_pandas(test_data.mixed_frame, - num_partitions).fillna(method='pad') + ray_df = from_pandas(test_data.mixed_frame, + num_partitions).fillna(method='pad') assert ray_df_equals_pandas(ray_df, result) pytest.raises(ValueError, test_data.tsframe.fillna) - pytest.raises(ValueError, rdf.from_pandas(test_data.tsframe, - num_partitions).fillna) + pytest.raises(ValueError, from_pandas(test_data.tsframe, + num_partitions).fillna) with pytest.raises(ValueError): - rdf.from_pandas(test_data.tsframe, num_partitions).fillna( + from_pandas(test_data.tsframe, num_partitions).fillna( 5, method='ffill' ) @@ -1131,11 +1135,11 @@ def test_fillna_sanity(num_partitions=2): mf = test_data.mixed_float.reindex(columns=['A', 'B', 'D']) mf.loc[mf.index[-10:], 'A'] = np.nan result = mf.fillna(value=0) - ray_df = rdf.from_pandas(mf, num_partitions).fillna(value=0) + ray_df = from_pandas(mf, num_partitions).fillna(value=0) assert ray_df_equals_pandas(ray_df, result) result = mf.fillna(method='pad') - ray_df = rdf.from_pandas(mf, num_partitions).fillna(method='pad') + ray_df = from_pandas(mf, num_partitions).fillna(method='pad') assert ray_df_equals_pandas(ray_df, result) # TODO: Use this when Arrow issue resolves: @@ -1151,10 +1155,10 @@ def test_fillna_sanity(num_partitions=2): 'b', 'b', np.nan, 'b'], ['c', 'c', np.nan, 'c']]) result = df.fillna({2: 'foo'}) - ray_df = rdf.from_pandas(df, num_partitions).fillna({2: 'foo'}) + ray_df = from_pandas(df, num_partitions).fillna({2: 'foo'}) assert ray_df_equals_pandas(ray_df, result) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) df.fillna({2: 'foo'}, inplace=True) ray_df.fillna({2: 'foo'}, inplace=True) assert ray_df_equals_pandas(ray_df, result) @@ -1165,7 +1169,7 @@ def test_fillna_sanity(num_partitions=2): df.iloc[3:5, 2] = np.nan # result = df.fillna(999, limit=1) - # ray_df = rdf.from_pandas(df, num_partitions).fillna(999, limit=1) + # ray_df = from_pandas(df, num_partitions).fillna(999, limit=1) # assert ray_df_equals_pandas(ray_df, result) @@ -1176,7 +1180,7 @@ def test_fillna_sanity(num_partitions=2): 'Date2': [pd.Timestamp("2013-1-1"), pd.NaT] }) result = df.fillna(value={'Date': df['Date2']}) - ray_df = rdf.from_pandas(df, num_partitions).fillna( + ray_df = from_pandas(df, num_partitions).fillna( value={'Date': df['Date2']} ) assert ray_df_equals_pandas(ray_df, result) @@ -1188,13 +1192,13 @@ def test_fillna_sanity(num_partitions=2): """ df = pd.DataFrame({'A': [pd.Timestamp('2012-11-11 00:00:00+01:00'), pd.NaT]}) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas(ray_df.fillna(method='pad'), df.fillna(method='pad')) df = pd.DataFrame({'A': [pd.NaT, pd.Timestamp('2012-11-11 00:00:00+01:00')]}) - ray_df = rdf.from_pandas(df, num_partitions).fillna(method='bfill') + ray_df = from_pandas(df, num_partitions).fillna(method='bfill') assert ray_df_equals_pandas(ray_df, df.fillna(method='bfill')) """ @@ -1205,13 +1209,13 @@ def test_fillna_downcast(num_partitions=2): # infer int64 from float64 df = pd.DataFrame({'a': [1., np.nan]}) result = df.fillna(0, downcast='infer') - ray_df = rdf.from_pandas(df, num_partitions).fillna(0, downcast='infer') + ray_df = from_pandas(df, num_partitions).fillna(0, downcast='infer') assert ray_df_equals_pandas(ray_df, result) # infer int64 from float64 when fillna value is a dict df = pd.DataFrame({'a': [1., np.nan]}) result = df.fillna({'a': 0}, downcast='infer') - ray_df = rdf.from_pandas(df, num_partitions).fillna( + ray_df = from_pandas(df, num_partitions).fillna( {'a': 0}, downcast='infer' ) assert ray_df_equals_pandas(ray_df, result) @@ -1222,7 +1226,7 @@ def test_ffill2(num_partitions=2): test_data = TestData() test_data.tsframe['A'][:5] = np.nan test_data.tsframe['A'][-5:] = np.nan - ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + ray_df = from_pandas(test_data.tsframe, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='ffill'), test_data.tsframe.fillna(method='ffill') @@ -1234,7 +1238,7 @@ def test_bfill2(num_partitions=2): test_data = TestData() test_data.tsframe['A'][:5] = np.nan test_data.tsframe['A'][-5:] = np.nan - ray_df = rdf.from_pandas(test_data.tsframe, num_partitions) + ray_df = from_pandas(test_data.tsframe, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='bfill'), test_data.tsframe.fillna(method='bfill') @@ -1247,20 +1251,20 @@ def test_fillna_inplace(num_partitions=2): df[1][:4] = np.nan df[3][-4:] = np.nan - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) df.fillna(value=0, inplace=True) assert not ray_df_equals_pandas(ray_df, df) ray_df.fillna(value=0, inplace=True) assert ray_df_equals_pandas(ray_df, df) - ray_df = rdf.from_pandas(df, num_partitions).fillna(value={0: 0}, - inplace=True) + ray_df = from_pandas(df, num_partitions).fillna(value={0: 0}, + inplace=True) assert ray_df is None df[1][:4] = np.nan df[3][-4:] = np.nan - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) df.fillna(method='ffill', inplace=True) assert not ray_df_equals_pandas(ray_df, df) @@ -1277,14 +1281,14 @@ def test_frame_fillna_limit(num_partitions=2): expected = df[:2].reindex(index) expected = expected.fillna(method='pad', limit=5) - ray_df = rdf.from_pandas(df[:2].reindex(index), num_partitions).fillna( + ray_df = from_pandas(df[:2].reindex(index), num_partitions).fillna( method='pad', limit=5 ) assert ray_df_equals_pandas(ray_df, expected) expected = df[-2:].reindex(index) expected = expected.fillna(method='backfill', limit=5) - ray_df = rdf.from_pandas(df[-2:].reindex(index), num_partitions).fillna( + ray_df = from_pandas(df[-2:].reindex(index), num_partitions).fillna( method='backfill', limit=5 ) assert ray_df_equals_pandas(ray_df, expected) @@ -1296,14 +1300,14 @@ def test_frame_pad_backfill_limit(num_partitions=2): df = pd.DataFrame(np.random.randn(10, 4), index=index) result = df[:2].reindex(index) - ray_df = rdf.from_pandas(result, num_partitions) + ray_df = from_pandas(result, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='pad', limit=5), result.fillna(method='pad', limit=5) ) result = df[-2:].reindex(index) - ray_df = rdf.from_pandas(result, num_partitions) + ray_df = from_pandas(result, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='backfill', limit=5), result.fillna(method='backfill', limit=5) @@ -1317,7 +1321,7 @@ def test_fillna_dtype_conversion(num_partitions=2): # empty block df = pd.DataFrame(index=range(3), columns=['A', 'B'], dtype='float64') - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna('nan'), df.fillna('nan') @@ -1325,7 +1329,7 @@ def test_fillna_dtype_conversion(num_partitions=2): # equiv of replace df = pd.DataFrame(dict(A=[1, np.nan], B=[1., 2.])) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) for v in ['', 1, np.nan, 1.0]: assert ray_df_equals_pandas( ray_df.fillna(v), @@ -1338,7 +1342,7 @@ def test_fillna_skip_certain_blocks(num_partitions=2): # don't try to fill boolean, int blocks df = pd.DataFrame(np.random.randn(10, 4).astype(int)) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) # it works! assert ray_df_equals_pandas( @@ -1352,7 +1356,7 @@ def test_fillna_dict_series(num_partitions=2): df = pd.DataFrame({'a': [np.nan, 1, 2, np.nan, np.nan], 'b': [1, 2, 3, np.nan, np.nan], 'c': [np.nan, 1, 2, 3, 4]}) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna({'a': 0, 'b': 5}), @@ -1379,7 +1383,7 @@ def test_fillna_dataframe(num_partitions=2): 'b': [1, 2, 3, np.nan, np.nan], 'c': [np.nan, 1, 2, 3, 4]}, index=list('VWXYZ')) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) # df2 may have different index and columns df2 = pd.DataFrame({'a': [np.nan, 10, 20, 30, 40], @@ -1398,7 +1402,7 @@ def test_fillna_dataframe(num_partitions=2): def test_fillna_columns(num_partitions=2): df = pd.DataFrame(np.random.randn(10, 10)) df.values[:, ::2] = np.nan - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='ffill', axis=1), @@ -1406,7 +1410,7 @@ def test_fillna_columns(num_partitions=2): ) df.insert(6, 'foo', 5) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='ffill', axis=1), df.fillna(method='ffill', axis=1) @@ -1416,7 +1420,7 @@ def test_fillna_columns(num_partitions=2): @pytest.fixture def test_fillna_invalid_method(num_partitions=2): test_data = TestData() - ray_df = rdf.from_pandas(test_data.frame, num_partitions) + ray_df = from_pandas(test_data.frame, num_partitions) with tm.assert_raises_regex(ValueError, 'ffil'): ray_df.fillna(method='ffil') @@ -1424,7 +1428,7 @@ def test_fillna_invalid_method(num_partitions=2): @pytest.fixture def test_fillna_invalid_value(num_partitions=2): test_data = TestData() - ray_df = rdf.from_pandas(test_data.frame, num_partitions) + ray_df = from_pandas(test_data.frame, num_partitions) # list pytest.raises(TypeError, ray_df.fillna, [1, 2]) # tuple @@ -1439,7 +1443,7 @@ def test_fillna_col_reordering(num_partitions=2): cols = ["COL." + str(i) for i in range(5, 0, -1)] data = np.random.rand(20, 5) df = pd.DataFrame(index=range(20), columns=cols, data=data) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna(method='ffill'), df.fillna(method='ffill') @@ -1457,7 +1461,7 @@ def test_fillna_datetime_columns(num_partitions=2): 'C': ['foo', 'bar', None], 'D': ['foo2', 'bar2', None]}, index=date_range('20130110', periods=3)) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna('?'), df.fillna('?') @@ -1469,7 +1473,7 @@ def test_fillna_datetime_columns(num_partitions=2): 'C': ['foo', 'bar', None], 'D': ['foo2', 'bar2', None]}, index=date_range('20130110', periods=3)) - ray_df = rdf.from_pandas(df, num_partitions) + ray_df = from_pandas(df, num_partitions) assert ray_df_equals_pandas( ray_df.fillna('?'), df.fillna('?') @@ -1915,7 +1919,7 @@ def test_query(ray_df, pandas_df, funcs): for f in funcs: pandas_df_new, ray_df_new = pandas_df.query(f), ray_df.query(f) - assert pandas_df_new.equals(rdf.to_pandas(ray_df_new)) + assert pandas_df_new.equals(to_pandas(ray_df_new)) def test_radd(): @@ -2237,16 +2241,14 @@ def test_resample(): @pytest.fixture def test_reset_index(ray_df, pandas_df, inplace=False): if not inplace: - print(rdf.to_pandas(ray_df.reset_index(inplace=inplace)).index) - print(pandas_df.reset_index(inplace=inplace)) - assert rdf.to_pandas(ray_df.reset_index(inplace=inplace)).equals( + assert to_pandas(ray_df.reset_index(inplace=inplace)).equals( pandas_df.reset_index(inplace=inplace)) else: ray_df_cp = ray_df.copy() pd_df_cp = pandas_df.copy() ray_df_cp.reset_index(inplace=inplace) pd_df_cp.reset_index(inplace=inplace) - assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) + assert to_pandas(ray_df_cp).equals(pd_df_cp) def test_rfloordiv(): @@ -2334,21 +2336,21 @@ def test_sem(): @pytest.fixture def test_set_axis(ray_df, pandas_df, label, axis): - assert rdf.to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals( + assert to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals( pandas_df.set_axis(label, axis, inplace=False)) @pytest.fixture def test_set_index(ray_df, pandas_df, keys, inplace=False): if not inplace: - assert rdf.to_pandas(ray_df.set_index(keys)).equals( + assert to_pandas(ray_df.set_index(keys)).equals( pandas_df.set_index(keys)) else: ray_df_cp = ray_df.copy() pd_df_cp = pandas_df.copy() ray_df_cp.set_index(keys, inplace=inplace) pd_df_cp.set_index(keys, inplace=inplace) - assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) + assert to_pandas(ray_df_cp).equals(pd_df_cp) def test_set_value(): @@ -2730,7 +2732,7 @@ def test___unicode__(): @pytest.fixture def test___neg__(ray_df, pd_df): ray_df_neg = ray_df.__neg__() - assert pd_df.__neg__().equals(rdf.to_pandas(ray_df_neg)) + assert pd_df.__neg__().equals(to_pandas(ray_df_neg)) def test___invert__(): diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py new file mode 100644 index 000000000..777f1033b --- /dev/null +++ b/python/ray/dataframe/utils.py @@ -0,0 +1,157 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pandas as pd +import ray + + +def _get_lengths(df): + """Gets the length of the dataframe. + + Args: + df: A remote pd.DataFrame object. + + Returns: + Returns an integer length of the dataframe object. If the attempt + fails, returns 0 as the length. + """ + try: + return len(df) + # Because we sometimes have cases where we have summary statistics in our + # DataFrames + except TypeError: + return 0 + + +def from_pandas(df, npartitions=None, chunksize=None): + """Converts a pandas DataFrame to a Ray DataFrame. + + Args: + df (pandas.DataFrame): The pandas DataFrame to convert. + npartitions (int): The number of partitions to split the DataFrame + into. Has priority over chunksize. + chunksize (int): The number of rows to put in each partition. + + Returns: + A new Ray DataFrame object. + """ + from .dataframe import DataFrame + + if npartitions is not None: + chunksize = int(len(df) / npartitions) + elif chunksize is None: + raise ValueError("The number of partitions or chunksize must be set.") + + temp_df = df + + dataframes = [] + lengths = [] + while len(temp_df) > chunksize: + t_df = temp_df[:chunksize] + lengths.append(len(t_df)) + # reset_index here because we want a pd.RangeIndex + # within the partitions. It is smaller and sometimes faster. + t_df = t_df.reset_index(drop=True) + top = ray.put(t_df) + dataframes.append(top) + temp_df = temp_df[chunksize:] + else: + temp_df = temp_df.reset_index(drop=True) + dataframes.append(ray.put(temp_df)) + lengths.append(len(temp_df)) + + return DataFrame(dataframes, df.columns, index=df.index) + + +def to_pandas(df): + """Converts a Ray DataFrame to a pandas DataFrame/Series. + + Args: + df (ray.DataFrame): The Ray DataFrame to convert. + + Returns: + A new pandas DataFrame. + """ + pd_df = pd.concat(ray.get(df._df)) + pd_df.index = df.index + pd_df.columns = df.columns + return pd_df + + +@ray.remote +def _shuffle(df, indices, chunksize): + """Shuffle data by sending it through the Ray Store. + + Args: + df (pd.DataFrame): The pandas DataFrame to shuffle. + indices ([any]): The list of indices for the DataFrame. + chunksize (int): The number of indices to send. + + Returns: + The list of pd.DataFrame objects in order of their assignment. This + order is important because it determines which task will get the data. + """ + i = 0 + partition = [] + while len(indices) > chunksize: + oids = df.reindex(indices[:chunksize]) + partition.append(oids) + indices = indices[chunksize:] + i += 1 + else: + oids = df.reindex(indices) + partition.append(oids) + return partition + + +@ray.remote +def _local_groupby(df_rows, axis=0): + """Apply a groupby on this partition for the blocks sent to it. + + Args: + df_rows ([pd.DataFrame]): A list of dataframes for this partition. Goes + through the Ray object store. + + Returns: + A DataFrameGroupBy object from the resulting groupby. + """ + concat_df = pd.concat(df_rows, axis=axis) + return concat_df.groupby(concat_df.index) + + +@ray.remote +def _deploy_func(func, dataframe, *args): + """Deploys a function for the _map_partitions call. + + Args: + dataframe (pandas.DataFrame): The pandas DataFrame for this partition. + + Returns: + A futures object representing the return value of the function + provided. + """ + if len(args) == 0: + return func(dataframe) + else: + return func(dataframe, *args) + + +@ray.remote(num_return_vals=2) +def _compute_length_and_index(dfs): + """Create a default index, which is a RangeIndex + + Returns: + The pd.RangeIndex object that represents this DataFrame. + """ + lengths = ray.get([_deploy_func.remote(_get_lengths, d) + for d in dfs]) + + dest_indices = {"partition": + [i for i in range(len(lengths)) + for j in range(lengths[i])], + "index_within_partition": + [j for i in range(len(lengths)) + for j in range(lengths[i])]} + + return lengths, pd.DataFrame(dest_indices)