diff --git a/.travis.yml b/.travis.yml index c07b8a043..acf476734 100644 --- a/.travis.yml +++ b/.travis.yml @@ -142,6 +142,7 @@ script: # ray dataframe tests - python -m pytest python/ray/dataframe/test/test_dataframe.py - python -m pytest python/ray/dataframe/test/test_series.py + - python -m pytest python/ray/dataframe/test/test_concat.py # ray tune tests - python python/ray/tune/test/dependency_test.py diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index bafb7be14..5fa837db7 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -30,9 +30,10 @@ def get_npartitions(): from .dataframe import DataFrame # noqa: 402 from .series import Series # noqa: 402 from .io import (read_csv, read_parquet) # noqa: 402 +from .concat import concat # noqa: 402 __all__ = [ - "DataFrame", "Series", "read_csv", "read_parquet" + "DataFrame", "Series", "read_csv", "read_parquet", "concat" ] try: diff --git a/python/ray/dataframe/concat.py b/python/ray/dataframe/concat.py new file mode 100644 index 000000000..3271bdb7f --- /dev/null +++ b/python/ray/dataframe/concat.py @@ -0,0 +1,90 @@ +import pandas as pd +import numpy as np +from .dataframe import DataFrame as rdf +from .utils import ( + from_pandas, + _deploy_func) +from functools import reduce + + +def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False, + keys=None, levels=None, names=None, verify_integrity=False, + copy=True): + + def _concat(frame1, frame2): + # Check type on objects + # Case 1: Both are Pandas DF + if isinstance(frame1, pd.DataFrame) and \ + isinstance(frame2, pd.DataFrame): + + return pd.concat((frame1, frame2), axis, join, join_axes, + ignore_index, keys, levels, names, + verify_integrity, copy) + + if not (isinstance(frame1, rdf) and + isinstance(frame2, rdf)) and join == 'inner': + raise NotImplementedError( + "Obj as dicts not implemented. To contribute to " + "Pandas on Ray, please visit github.com/ray-project/ray." + ) + + # Case 2: Both are different types + if isinstance(frame1, pd.DataFrame): + frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1) + if isinstance(frame2, pd.DataFrame): + frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1) + + # Case 3: Both are Ray DF + if isinstance(frame1, rdf) and \ + isinstance(frame2, rdf): + + new_columns = frame1.columns.join(frame2.columns, how=join) + + def _reindex_helper(pdf, old_columns, join): + pdf.columns = old_columns + if join == 'outer': + pdf = pdf.reindex(columns=new_columns) + else: + pdf = pdf[new_columns] + pdf.columns = pd.RangeIndex(len(new_columns)) + + return pdf + + f1_columns, f2_columns = frame1.columns, frame2.columns + new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p, + f1_columns, join), part) for + part in frame1._row_partitions] + new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p, + f2_columns, join), part) for + part in frame2._row_partitions] + + return rdf(row_partitions=new_f1 + new_f2, columns=new_columns, + index=frame1.index.append(frame2.index)) + + # (TODO) Group all the pandas dataframes + + if isinstance(objs, dict): + raise NotImplementedError( + "Obj as dicts not implemented. To contribute to " + "Pandas on Ray, please visit github.com/ray-project/ray." + ) + + axis = pd.DataFrame()._get_axis_number(axis) + if axis == 1: + raise NotImplementedError( + "Concat not implemented for axis=1. To contribute to " + "Pandas on Ray, please visit github.com/ray-project/ray." + ) + + all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs]) + if all_pd: + result = pd.concat(objs, axis, join, join_axes, + ignore_index, keys, levels, names, + verify_integrity, copy) + else: + result = reduce(_concat, objs) + + if isinstance(result, pd.DataFrame): + return from_pandas(result, len(result) / 2**16 + 1) + + return result diff --git a/python/ray/dataframe/test/test_concat.py b/python/ray/dataframe/test/test_concat.py new file mode 100644 index 000000000..8ea3fe98c --- /dev/null +++ b/python/ray/dataframe/test/test_concat.py @@ -0,0 +1,107 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import pandas as pd +import ray.dataframe as rdf +from ray.dataframe.utils import ( + to_pandas, + from_pandas +) + + +@pytest.fixture +def ray_df_equals_pandas(ray_df, pandas_df): + return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + + +@pytest.fixture +def ray_df_equals(ray_df1, ray_df2): + return to_pandas(ray_df1).sort_index().equals( + to_pandas(ray_df2).sort_index() + ) + + +@pytest.fixture +def generate_dfs(): + df = pd.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col4': [12, 13, 14, 15], + 'col5': [0, 0, 0, 0]}) + + df2 = pd.DataFrame({'col1': [0, 1, 2, 3], + 'col2': [4, 5, 6, 7], + 'col3': [8, 9, 10, 11], + 'col6': [12, 13, 14, 15], + 'col7': [0, 0, 0, 0]}) + return df, df2 + + +@pytest.fixture +def test_df_concat(): + df, df2 = generate_dfs() + + assert(ray_df_equals_pandas(rdf.concat([df, df2]), pd.concat([df, df2]))) + + +def test_ray_concat(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2]), + pd.concat([df, df2]))) + + +def test_ray_concat_on_index(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='index'), + pd.concat([df, df2], axis='index'))) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='rows'), + pd.concat([df, df2], axis='rows'))) + + assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis=0), + pd.concat([df, df2], axis=0))) + + +def test_ray_concat_on_column(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + with pytest.raises(NotImplementedError): + rdf.concat([ray_df, ray_df2], axis=1) + + with pytest.raises(NotImplementedError): + rdf.concat([ray_df, ray_df2], axis="columns") + + +def test_invalid_axis_errors(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2) + + with pytest.raises(ValueError): + rdf.concat([ray_df, ray_df2], axis=2) + + +def test_mixed_concat(): + df, df2 = generate_dfs() + df3 = df.copy() + + mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] + + assert(ray_df_equals_pandas(rdf.concat(mixed_dfs), + pd.concat([df, df2, df3]))) + + +def test_mixed_inner_concat(): + df, df2 = generate_dfs() + df3 = df.copy() + + mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3] + + with pytest.raises(NotImplementedError): + rdf.concat(mixed_dfs, join="inner") diff --git a/python/ray/dataframe/test/test_io.py b/python/ray/dataframe/test/test_io.py index 0f2cc57ac..74aab1e2f 100644 --- a/python/ray/dataframe/test/test_io.py +++ b/python/ray/dataframe/test/test_io.py @@ -5,10 +5,11 @@ from __future__ import print_function import pytest import numpy as np import pandas as pd -import ray.dataframe as rdf import ray.dataframe.io as io import os +from ray.dataframe.utils import to_pandas + TEST_PARQUET_FILENAME = 'test.parquet' TEST_CSV_FILENAME = 'test.csv' SMALL_ROW_SIZE = 2000 @@ -17,7 +18,7 @@ LARGE_ROW_SIZE = 7e6 @pytest.fixture def ray_df_equals_pandas(ray_df, pandas_df): - return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) + return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index()) @pytest.fixture