From efeaacbedc6f6958070d0a485370eb75698c894a Mon Sep 17 00:00:00 2001 From: adgirish Date: Mon, 9 Apr 2018 21:36:24 -0700 Subject: [PATCH] Adding support for concat (#1739) adding tests fixing flake8 adding init flake 8 on test fixing tests, imports, and flake8 handling for index adding tests for row, index added more robust error handling for axis fixing test failures cleaning up error sfor 2.7 updating travis resolving import fixing flake8 moved import order Fixing to refactor and delaying implementing ray-pd inner concat resolving ray-pd concat and from_pandas mutation Revert "resolving ray-pd concat and from_pandas mutation" This reverts commit 5db43e4e89e328286532f3ef98a4526575c5d08d. --- .travis.yml | 1 + python/ray/dataframe/__init__.py | 3 +- python/ray/dataframe/concat.py | 90 +++++++++++++++++++ python/ray/dataframe/test/test_concat.py | 107 +++++++++++++++++++++++ python/ray/dataframe/test/test_io.py | 5 +- 5 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 python/ray/dataframe/concat.py create mode 100644 python/ray/dataframe/test/test_concat.py diff --git a/.travis.yml b/.travis.yml index c07b8a043..acf476734 100644 --- a/.travis.yml +++ b/.travis.yml @@ -142,6 +142,7 @@ script: # ray dataframe tests - python -m pytest python/ray/dataframe/test/test_dataframe.py - python -m pytest python/ray/dataframe/test/test_series.py + - python -m pytest python/ray/dataframe/test/test_concat.py # ray tune tests - python python/ray/tune/test/dependency_test.py diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index bafb7be14..5fa837db7 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -30,9 +30,10 @@ def get_npartitions(): from .dataframe import DataFrame # noqa: 402 from .series import Series # noqa: 402 from .io import (read_csv, read_parquet) # noqa: 402 +from .concat import concat # noqa: 402 __all__ = [ - "DataFrame", "Series", "read_csv", "read_parquet" + "DataFrame", "Series", "read_csv", "read_parquet", "concat" ] try: diff --git a/python/ray/dataframe/concat.py b/python/ray/dataframe/concat.py new file mode 100644 index 000000000..3271bdb7f --- /dev/null +++ b/python/ray/dataframe/concat.py @@ -0,0 +1,90 @@ +import pandas as pd +import numpy as np +from .dataframe import DataFrame as rdf +from .utils import ( + from_pandas, + _deploy_func) +from functools import reduce + + +def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, + keys=None, levels=None, names=None, verify_integrity=False, + copy=True): + + def _concat(frame1, frame2): + # Check type on objects + # Case 1: Both are Pandas DF + if isinstance(frame1, pd.DataFrame) and \ + isinstance(frame2, pd.DataFrame): + + return pd.concat((frame1, frame2), axis, join, join_axes, + ignore_index, keys, levels, names, + verify_integrity, copy) + + if not (isinstance(frame1, rdf) and + isinstance(frame2, rdf)) and join == 'inner': + raise NotImplementedError( + "Obj as dicts not implemented. To contribute to " + "Pandas on Ray, please visit github.com/ray-project/ray." + ) + + # Case 2: Both are different types + if isinstance(frame1, pd.DataFrame): + frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1) + if isinstance(frame2, pd.DataFrame): + frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1) + + # Case 3: Both are Ray DF + if isinstance(frame1, rdf) and \ + isinstance(frame2, rdf): + + new_columns = frame1.columns.join(frame2.columns, how=join) + + def _reindex_helper(pdf, old_columns, join): + pdf.columns = old_columns + if join == 'outer': + pdf = pdf.reindex(columns=new_columns) + else: + pdf = pdf[new_columns] + pdf.columns = pd.RangeIndex(len(new_columns)) + + return pdf + + f1_columns, f2_columns = frame1.columns, frame2.columns + new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p, + f1_columns, join), part) for + part in frame1._row_partitions] + new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p, + f2_columns, join), part) for + part in frame2._row_partitions] + + return rdf(row_partitions=new_f1 + new_f2, columns=new_columns, + index=frame1.index.append(frame2.index)) + + # (TODO) Group all the pandas dataframes + + if isinstance(objs, dict): + raise NotImplementedError( + "Obj as dicts not implemented. To contribute to " + "Pandas on Ray, please visit github.com/ray-project/ray." + ) + + axis = pd.DataFrame()._get_axis_number(axis) + if axis == 1: + raise NotImplementedError( + "Concat not implemented for axis=1. To contribute to " + "Pandas on Ray, please visit github.com/ray-project/ray." + ) + + all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs]) + if all_pd: + result = pd.concat(objs, axis, join, join_axes, + ignore_index, keys, levels, names, + verify_integrity, copy) + else: + result = reduce(_concat, objs) + + if isinstance(result, pd.DataFrame): + return from_pandas(result, len(result) / 2**16 + 1) + + return result diff --git a/python/ray/dataframe/test/test_concat.py b/python/ray/dataframe/test/test_concat.py new file mode 100644 index 000000000..8ea3fe98c --- /dev/null +++ b/python/ray/dataframe/test/test_concat.py @@ -0,0 +1,107 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import pandas as pd +import ray.dataframe as rdf +from ray.dataframe.utils import ( + to_pandas, + from_pandas +) + + +@pytest.fixture +def ray_df_equals_pandas(ray_df, pandas_df): + return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + + +@pytest.fixture +def ray_df_equals(ray_df1, ray_df2): + return to_pandas(ray_df1).sort_index().equals( + to_pandas(ray_df2).sort_index() + ) + + +@pytest.fixture +def generate_dfs(): + df = pd.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) + + df2 = pd.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col6': [12, 13, 14, 15], + 'col7': [0, 0, 0, 0]}) + return df, df2 + + +@pytest.fixture +def test_df_concat(): + df, df2 = generate_dfs() + + assert(ray_df_equals_pandas(rdf.concat([df, df2]), pd.concat([df, df2]))) + + +def test_ray_concat(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2]), + pd.concat([df, df2]))) + + +def test_ray_concat_on_index(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='index'), + pd.concat([df, df2], axis='index'))) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='rows'), + pd.concat([df, df2], axis='rows'))) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis=0), + pd.concat([df, df2], axis=0))) + + +def test_ray_concat_on_column(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + with pytest.raises(NotImplementedError): + rdf.concat([ray_df, ray_df2], axis=1) + + with pytest.raises(NotImplementedError): + rdf.concat([ray_df, ray_df2], axis="columns") + + +def test_invalid_axis_errors(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + with pytest.raises(ValueError): + rdf.concat([ray_df, ray_df2], axis=2) + + +def test_mixed_concat(): + df, df2 = generate_dfs() + df3 = df.copy() + + mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] + + assert(ray_df_equals_pandas(rdf.concat(mixed_dfs), + pd.concat([df, df2, df3]))) + + +def test_mixed_inner_concat(): + df, df2 = generate_dfs() + df3 = df.copy() + + mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] + + with pytest.raises(NotImplementedError): + rdf.concat(mixed_dfs, join="inner") diff --git a/python/ray/dataframe/test/test_io.py b/python/ray/dataframe/test/test_io.py index 0f2cc57ac..74aab1e2f 100644 --- a/python/ray/dataframe/test/test_io.py +++ b/python/ray/dataframe/test/test_io.py @@ -5,10 +5,11 @@ from __future__ import print_function import pytest import numpy as np import pandas as pd -import ray.dataframe as rdf import ray.dataframe.io as io import os +from ray.dataframe.utils import to_pandas + TEST_PARQUET_FILENAME = 'test.parquet' TEST_CSV_FILENAME = 'test.csv' SMALL_ROW_SIZE = 2000 @@ -17,7 +18,7 @@ LARGE_ROW_SIZE = 7e6 @pytest.fixture def ray_df_equals_pandas(ray_df, pandas_df): - return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) @pytest.fixture