From 1d1df7bbec1ddd13b561fd633877469430d3de65 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Mon, 23 Apr 2018 17:09:57 -0700 Subject: [PATCH] [DataFrame] Fully implement append, concat and join (#1932) --- python/ray/dataframe/concat.py | 184 ++++++++++++-------- python/ray/dataframe/dataframe.py | 137 ++++++++++++++- python/ray/dataframe/test/test_concat.py | 90 ++++++---- python/ray/dataframe/test/test_dataframe.py | 44 ++++- python/ray/dataframe/utils.py | 24 +++ 5 files changed, 359 insertions(+), 120 deletions(-) diff --git a/python/ray/dataframe/concat.py b/python/ray/dataframe/concat.py index 3271bdb7f..6d1664ca5 100644 --- a/python/ray/dataframe/concat.py +++ b/python/ray/dataframe/concat.py @@ -1,90 +1,130 @@ -import pandas as pd -import numpy as np -from .dataframe import DataFrame as rdf -from .utils import ( - from_pandas, - _deploy_func) -from functools import reduce +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pandas +from .dataframe import DataFrame +from .utils import _reindex_helper def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, keys=None, levels=None, names=None, verify_integrity=False, copy=True): - def _concat(frame1, frame2): - # Check type on objects - # Case 1: Both are Pandas DF - if isinstance(frame1, pd.DataFrame) and \ - isinstance(frame2, pd.DataFrame): + if keys is not None: + objs = [objs[k] for k in keys] + else: + objs = list(objs) - return pd.concat((frame1, frame2), axis, join, join_axes, + if len(objs) == 0: + raise ValueError("No objects to concatenate") + + objs = [obj for obj in objs if obj is not None] + + if len(objs) == 0: + raise ValueError("All objects passed were None") + + try: + type_check = next(obj for obj in objs + if not isinstance(obj, (pandas.Series, + pandas.DataFrame, + DataFrame))) + except StopIteration: + type_check = None + if type_check is not None: + raise ValueError("cannot concatenate object of type \"{0}\"; only " + "pandas.Series, pandas.DataFrame, " + "and ray.dataframe.DataFrame objs are " + "valid", type(type_check)) + + all_series = all([isinstance(obj, pandas.Series) + for obj in objs]) + if all_series: + return pandas.concat(objs, axis, join, join_axes, ignore_index, keys, levels, names, verify_integrity, copy) - if not (isinstance(frame1, rdf) and - isinstance(frame2, rdf)) and join == 'inner': - raise NotImplementedError( - "Obj as dicts not implemented. To contribute to " - "Pandas on Ray, please visit github.com/ray-project/ray." - ) - - # Case 2: Both are different types - if isinstance(frame1, pd.DataFrame): - frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1) - if isinstance(frame2, pd.DataFrame): - frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1) - - # Case 3: Both are Ray DF - if isinstance(frame1, rdf) and \ - isinstance(frame2, rdf): - - new_columns = frame1.columns.join(frame2.columns, how=join) - - def _reindex_helper(pdf, old_columns, join): - pdf.columns = old_columns - if join == 'outer': - pdf = pdf.reindex(columns=new_columns) - else: - pdf = pdf[new_columns] - pdf.columns = pd.RangeIndex(len(new_columns)) - - return pdf - - f1_columns, f2_columns = frame1.columns, frame2.columns - new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p, - f1_columns, join), part) for - part in frame1._row_partitions] - new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p, - f2_columns, join), part) for - part in frame2._row_partitions] - - return rdf(row_partitions=new_f1 + new_f2, columns=new_columns, - index=frame1.index.append(frame2.index)) - - # (TODO) Group all the pandas dataframes - if isinstance(objs, dict): raise NotImplementedError( "Obj as dicts not implemented. To contribute to " - "Pandas on Ray, please visit github.com/ray-project/ray." - ) + "Pandas on Ray, please visit github.com/ray-project/ray.") - axis = pd.DataFrame()._get_axis_number(axis) - if axis == 1: - raise NotImplementedError( - "Concat not implemented for axis=1. To contribute to " - "Pandas on Ray, please visit github.com/ray-project/ray." - ) + axis = pandas.DataFrame()._get_axis_number(axis) - all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs]) - if all_pd: - result = pd.concat(objs, axis, join, join_axes, - ignore_index, keys, levels, names, - verify_integrity, copy) + if join not in ['inner', 'outer']: + raise ValueError("Only can inner (intersect) or outer (union) join the" + " other axis") + + # We need this in a list because we use it later. + all_index, all_columns = list(zip(*[(obj.index, obj.columns) + for obj in objs])) + + def series_to_df(series, columns): + df = pandas.DataFrame(series) + df.columns = columns + return DataFrame(df) + + # Pandas puts all of the Series in a single column named 0. This is + # true regardless of the existence of another column named 0 in the + # concat. + if axis == 0: + objs = [series_to_df(obj, [0]) + if isinstance(obj, pandas.Series) else obj for obj in objs] else: - result = reduce(_concat, objs) + # Pandas starts the count at 0 so this will increment the names as + # long as there's a new nameless Series being added. + def name_incrementer(i): + val = i[0] + i[0] += 1 + return val - if isinstance(result, pd.DataFrame): - return from_pandas(result, len(result) / 2**16 + 1) + i = [0] + objs = [series_to_df(obj, obj.name if obj.name is not None + else name_incrementer(i)) + if isinstance(obj, pandas.Series) else obj for obj in objs] - return result + # Using concat on the columns and index is fast because they're empty, + # and it forces the error checking. It also puts the columns in the + # correct order for us. + final_index = \ + pandas.concat([pandas.DataFrame(index=idx) for idx in all_index], + axis=axis, join=join, join_axes=join_axes, + ignore_index=ignore_index, keys=keys, levels=levels, + names=names, verify_integrity=verify_integrity, + copy=False).index + final_columns = \ + pandas.concat([pandas.DataFrame(columns=col) + for col in all_columns], + axis=axis, join=join, join_axes=join_axes, + ignore_index=ignore_index, keys=keys, levels=levels, + names=names, verify_integrity=verify_integrity, + copy=False).columns + + # Put all of the DataFrames into Ray format + # TODO just partition the DataFrames instead of building a new Ray DF. + objs = [DataFrame(obj) if isinstance(obj, (pandas.DataFrame, + pandas.Series)) else obj + for obj in objs] + + # Here we reuse all_columns/index so we don't have to materialize objects + # from remote memory built in the previous line. In the future, we won't be + # building new DataFrames, rather just partitioning the DataFrames. + if axis == 0: + new_rows = [_reindex_helper.remote(part, all_columns[i], + final_columns, axis) + for i in range(len(objs)) + for part in objs[i]._row_partitions] + + return DataFrame(row_partitions=new_rows, + columns=final_columns, + index=final_index) + + else: + new_columns = [_reindex_helper.remote(part, all_index[i], + final_index, axis) + for i in range(len(objs)) + for part in objs[i]._col_partitions] + + return DataFrame(col_partitions=new_columns, + columns=final_columns, + index=final_index) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 3253cd0a1..df7da328f 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -35,7 +35,8 @@ from .utils import ( _blocks_to_col, _blocks_to_row, _create_block_partitions, - _inherit_docstrings) + _inherit_docstrings, + _reindex_helper) from . import get_npartitions from .index_metadata import _IndexMetadata @@ -911,9 +912,49 @@ class DataFrame(object): return self._arithmetic_helper(remote_func, axis, level) def append(self, other, ignore_index=False, verify_integrity=False): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Append another DataFrame/list/Series to this one. + + Args: + other: The object to append to this. + ignore_index: Ignore the index on appending. + verify_integrity: Verify the integrity of the index on completion. + + Returns: + A new DataFrame containing the concatenated values. + """ + if isinstance(other, (pd.Series, dict)): + if isinstance(other, dict): + other = pd.Series(other) + if other.name is None and not ignore_index: + raise TypeError('Can only append a Series if ignore_index=True' + ' or if the Series has a name') + + if other.name is None: + index = None + else: + # other must have the same index name as self, otherwise + # index name will be reset + index = pd.Index([other.name], name=self.index.name) + + combined_columns = self.columns.tolist() + self.columns.union( + other.index).difference(self.columns).tolist() + other = other.reindex(combined_columns, copy=False) + other = pd.DataFrame(other.values.reshape((1, len(other))), + index=index, + columns=combined_columns) + other = other._convert(datetime=True, timedelta=True) + elif isinstance(other, list) and not isinstance(other[0], DataFrame): + other = pd.DataFrame(other) + if (self.columns.get_indexer(other.columns) >= 0).all(): + other = other.loc[:, self.columns] + + from .concat import concat + if isinstance(other, (list, tuple)): + to_concat = [self] + other + else: + to_concat = [self, other] + return concat(to_concat, ignore_index=ignore_index, + verify_integrity=verify_integrity) def apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, args=(), **kwds): @@ -2028,9 +2069,91 @@ class DataFrame(object): def join(self, other, on=None, how='left', lsuffix='', rsuffix='', sort=False): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Join two or more DataFrames, or a DataFrame with a collection. + + Args: + other: What to join this DataFrame with. + on: A column name to use from the left for the join. + how: What type of join to conduct. + lsuffix: The suffix to add to column names that match on left. + rsuffix: The suffix to add to column names that match on right. + sort: Whether or not to sort. + + Returns: + The joined DataFrame. + """ + + if on is not None: + raise NotImplementedError("Not yet.") + + if isinstance(other, pd.Series): + if other.name is None: + raise ValueError("Other Series must have a name") + other = DataFrame({other.name: other}) + + if isinstance(other, DataFrame): + if on is not None: + index = self[on] + else: + index = self.index + + new_index = index.join(other.index, how=how, sort=sort) + + # Joining two empty DataFrames is fast, and error checks for us. + new_column_labels = pd.DataFrame(columns=self.columns) \ + .join(pd.DataFrame(columns=other.columns), + lsuffix=lsuffix, rsuffix=rsuffix).columns + + # Join is a concat once we have shuffled the data internally. + # We shuffle the data by computing the correct order. + # Another important thing to note: We set the current self index + # to the index variable which may be 'on'. + new_self = [_reindex_helper.remote(col, index, new_index, 1) + for col in self._col_partitions] + new_other = [_reindex_helper.remote(col, other.index, new_index, 1) + for col in other._col_partitions] + + # Append the columns together (i.e. concat) + new_column_parts = new_self + new_other + + # Default index in the case that on is set. + if on is not None: + new_index = None + + # TODO join the two metadata tables for performance. + return DataFrame(col_partitions=new_column_parts, + index=new_index, + columns=new_column_labels) + else: + # This constraint carried over from Pandas. + if on is not None: + raise ValueError("Joining multiple DataFrames only supported" + " for joining on index") + + # Joining the empty DataFrames with either index or columns is + # fast. It gives us proper error checking for the edge cases that + # would otherwise require a lot more logic. + new_index = pd.DataFrame(index=self.index).join( + [pd.DataFrame(index=obj.index) for obj in other], + how=how, sort=sort).index + + new_column_labels = pd.DataFrame(columns=self.columns).join( + [pd.DataFrame(columns=obj.columns) for obj in other], + lsuffix=lsuffix, rsuffix=rsuffix).columns + + new_self = [_reindex_helper.remote(col, self.index, new_index, 1) + for col in self._col_partitions] + + new_others = [_reindex_helper.remote(col, obj.index, new_index, 1) + for obj in other for col in obj._col_partitions] + + # Append the columns together (i.e. concat) + new_column_parts = new_self + new_others + + # TODO join the two metadata tables for performance. + return DataFrame(col_partitions=new_column_parts, + index=new_index, + columns=new_column_labels) def kurt(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): diff --git a/python/ray/dataframe/test/test_concat.py b/python/ray/dataframe/test/test_concat.py index 8ea3fe98c..62e881d05 100644 --- a/python/ray/dataframe/test/test_concat.py +++ b/python/ray/dataframe/test/test_concat.py @@ -3,8 +3,8 @@ from __future__ import division from __future__ import print_function import pytest -import pandas as pd -import ray.dataframe as rdf +import pandas +import ray.dataframe as pd from ray.dataframe.utils import ( to_pandas, from_pandas @@ -17,25 +17,34 @@ def ray_df_equals_pandas(ray_df, pandas_df): @pytest.fixture -def ray_df_equals(ray_df1, ray_df2): - return to_pandas(ray_df1).sort_index().equals( - to_pandas(ray_df2).sort_index() - ) +def generate_dfs(): + df = pandas.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) + + df2 = pandas.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col6': [12, 13, 14, 15], + 'col7': [0, 0, 0, 0]}) + return df, df2 @pytest.fixture -def generate_dfs(): - df = pd.DataFrame({'col1': [0, 1, 2, 3], - 'col2': [4, 5, 6, 7], - 'col3': [8, 9, 10, 11], - 'col4': [12, 13, 14, 15], - 'col5': [0, 0, 0, 0]}) +def generate_none_dfs(): + df = pandas.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, None, 7], + 'col3': [8, 9, 10, 11], + 'col4': [12, 13, 14, 15], + 'col5': [None, None, None, None]}) - df2 = pd.DataFrame({'col1': [0, 1, 2, 3], - 'col2': [4, 5, 6, 7], - 'col3': [8, 9, 10, 11], - 'col6': [12, 13, 14, 15], - 'col7': [0, 0, 0, 0]}) + df2 = pandas.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col6': [12, 13, 14, 15], + 'col7': [0, 0, 0, 0]}) return df, df2 @@ -43,40 +52,41 @@ def generate_dfs(): def test_df_concat(): df, df2 = generate_dfs() - assert(ray_df_equals_pandas(rdf.concat([df, df2]), pd.concat([df, df2]))) + assert(ray_df_equals_pandas(pd.concat([df, df2]), + pandas.concat([df, df2]))) def test_ray_concat(): df, df2 = generate_dfs() ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) - assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2]), - pd.concat([df, df2]))) + assert ray_df_equals_pandas(pd.concat([ray_df, ray_df2]), + pandas.concat([df, df2])) def test_ray_concat_on_index(): df, df2 = generate_dfs() ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) - assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='index'), - pd.concat([df, df2], axis='index'))) + assert ray_df_equals_pandas(pd.concat([ray_df, ray_df2], axis='index'), + pandas.concat([df, df2], axis='index')) - assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='rows'), - pd.concat([df, df2], axis='rows'))) + assert ray_df_equals_pandas(pd.concat([ray_df, ray_df2], axis='rows'), + pandas.concat([df, df2], axis='rows')) - assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis=0), - pd.concat([df, df2], axis=0))) + assert ray_df_equals_pandas(pd.concat([ray_df, ray_df2], axis=0), + pandas.concat([df, df2], axis=0)) def test_ray_concat_on_column(): df, df2 = generate_dfs() ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) - with pytest.raises(NotImplementedError): - rdf.concat([ray_df, ray_df2], axis=1) + assert ray_df_equals_pandas(pd.concat([ray_df, ray_df2], axis=1), + pandas.concat([df, df2], axis=1)) - with pytest.raises(NotImplementedError): - rdf.concat([ray_df, ray_df2], axis="columns") + assert ray_df_equals_pandas(pd.concat([ray_df, ray_df2], axis="columns"), + pandas.concat([df, df2], axis="columns")) def test_invalid_axis_errors(): @@ -84,7 +94,7 @@ def test_invalid_axis_errors(): ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) with pytest.raises(ValueError): - rdf.concat([ray_df, ray_df2], axis=2) + pd.concat([ray_df, ray_df2], axis=2) def test_mixed_concat(): @@ -93,8 +103,8 @@ def test_mixed_concat(): mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] - assert(ray_df_equals_pandas(rdf.concat(mixed_dfs), - pd.concat([df, df2, df3]))) + assert(ray_df_equals_pandas(pd.concat(mixed_dfs), + pandas.concat([df, df2, df3]))) def test_mixed_inner_concat(): @@ -103,5 +113,15 @@ def test_mixed_inner_concat(): mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] - with pytest.raises(NotImplementedError): - rdf.concat(mixed_dfs, join="inner") + assert(ray_df_equals_pandas(pd.concat(mixed_dfs, join='inner'), + pandas.concat([df, df2, df3], join='inner'))) + + +def test_mixed_none_concat(): + df, df2 = generate_none_dfs() + df3 = df.copy() + + mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] + + assert(ray_df_equals_pandas(pd.concat(mixed_dfs), + pandas.concat([df, df2, df3]))) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index dc3b83d35..96c8c75ca 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -853,10 +853,21 @@ def test_any(ray_df, pd_df): def test_append(): - ray_df = create_test_dataframe() + ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) - with pytest.raises(NotImplementedError): - ray_df.append(None) + pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + ray_df2 = rdf.DataFrame({"col5": [0], "col6": [1]}) + + pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]}) + + assert ray_df_equals_pandas(ray_df.append(ray_df2), + pandas_df.append(pandas_df2)) + + with pytest.raises(ValueError): + ray_df.append(ray_df2, verify_integrity=True) @pytest.fixture @@ -1870,10 +1881,31 @@ def test_itertuples(ray_df, pandas_df): def test_join(): - ray_df = create_test_dataframe() + ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) - with pytest.raises(NotImplementedError): - ray_df.join(None) + pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + ray_df2 = rdf.DataFrame({"col5": [0], "col6": [1]}) + + pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]}) + + join_types = ["left", "right", "outer", "inner"] + for how in join_types: + ray_join = ray_df.join(ray_df2, how=how) + pandas_join = pandas_df.join(pandas_df2, how=how) + ray_df_equals_pandas(ray_join, pandas_join) + + ray_df3 = rdf.DataFrame({"col7": [1, 2, 3, 5, 6, 7, 8]}) + + pandas_df3 = pd.DataFrame({"col7": [1, 2, 3, 5, 6, 7, 8]}) + + join_types = ["left", "outer", "inner"] + for how in join_types: + ray_join = ray_df.join([ray_df2, ray_df3], how=how) + pandas_join = pandas_df.join([pandas_df2, pandas_df3], how=how) + ray_df_equals_pandas(ray_join, pandas_join) def test_kurt(): diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 10382984c..3aee593bd 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -282,3 +282,27 @@ def _inherit_docstrings(parent): return cls return decorator + + +@ray.remote +def _reindex_helper(df, old_index, new_index, axis): + """Reindexes a dataframe to prepare for join/concat. + + Args: + df: The DataFrame partition + old_index: The index/column for this partition. + new_index: The new index/column to assign. + axis: Which axis to reindex over. + + Returns: + A new reindexed DataFrame. + """ + if axis == 1: + df.index = old_index + df = df.reindex(new_index, copy=False) + df.reset_index(inplace=True, drop=True) + elif axis == 0: + df.columns = old_index + df = df.reindex(columns=new_index, copy=False) + df.columns = pd.RangeIndex(len(df.columns)) + return df