Adding support for concat (#1739)

adding tests

fixing flake8

adding init

flake 8 on test

fixing tests, imports, and flake8

handling for index

adding tests for row, index

added more robust error handling for axis

fixing test failures

cleaning up error sfor 2.7

updating travis

resolving import

fixing flake8

moved import order

Fixing to refactor and delaying implementing ray-pd inner concat

resolving ray-pd concat and from_pandas mutation

Revert "resolving ray-pd concat and from_pandas mutation"

This reverts commit 5db43e4e89e328286532f3ef98a4526575c5d08d.
This commit is contained in:
adgirish
2018-04-09 21:36:24 -07:00
committed by Devin Petersohn
parent 3039cca242
commit efeaacbedc
5 changed files with 203 additions and 3 deletions
+2 -1
View File
@@ -30,9 +30,10 @@ def get_npartitions():
from .dataframe import DataFrame # noqa: 402
from .series import Series # noqa: 402
from .io import (read_csv, read_parquet) # noqa: 402
from .concat import concat # noqa: 402
__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet"
"DataFrame", "Series", "read_csv", "read_parquet", "concat"
]
try:
+90
View File
@@ -0,0 +1,90 @@
import pandas as pd
import numpy as np
from .dataframe import DataFrame as rdf
from .utils import (
from_pandas,
_deploy_func)
from functools import reduce
def concat(objs, axis=0, join='outer', join_axes=None, ignore_index=False,
keys=None, levels=None, names=None, verify_integrity=False,
copy=True):
def _concat(frame1, frame2):
# Check type on objects
# Case 1: Both are Pandas DF
if isinstance(frame1, pd.DataFrame) and \
isinstance(frame2, pd.DataFrame):
return pd.concat((frame1, frame2), axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
if not (isinstance(frame1, rdf) and
isinstance(frame2, rdf)) and join == 'inner':
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)
# Case 2: Both are different types
if isinstance(frame1, pd.DataFrame):
frame1 = from_pandas(frame1, len(frame1) / 2**16 + 1)
if isinstance(frame2, pd.DataFrame):
frame2 = from_pandas(frame2, len(frame2) / 2**16 + 1)
# Case 3: Both are Ray DF
if isinstance(frame1, rdf) and \
isinstance(frame2, rdf):
new_columns = frame1.columns.join(frame2.columns, how=join)
def _reindex_helper(pdf, old_columns, join):
pdf.columns = old_columns
if join == 'outer':
pdf = pdf.reindex(columns=new_columns)
else:
pdf = pdf[new_columns]
pdf.columns = pd.RangeIndex(len(new_columns))
return pdf
f1_columns, f2_columns = frame1.columns, frame2.columns
new_f1 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f1_columns, join), part) for
part in frame1._row_partitions]
new_f2 = [_deploy_func.remote(lambda p: _reindex_helper(p,
f2_columns, join), part) for
part in frame2._row_partitions]
return rdf(row_partitions=new_f1 + new_f2, columns=new_columns,
index=frame1.index.append(frame2.index))
# (TODO) Group all the pandas dataframes
if isinstance(objs, dict):
raise NotImplementedError(
"Obj as dicts not implemented. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)
axis = pd.DataFrame()._get_axis_number(axis)
if axis == 1:
raise NotImplementedError(
"Concat not implemented for axis=1. To contribute to "
"Pandas on Ray, please visit github.com/ray-project/ray."
)
all_pd = np.all([isinstance(obj, pd.DataFrame) for obj in objs])
if all_pd:
result = pd.concat(objs, axis, join, join_axes,
ignore_index, keys, levels, names,
verify_integrity, copy)
else:
result = reduce(_concat, objs)
if isinstance(result, pd.DataFrame):
return from_pandas(result, len(result) / 2**16 + 1)
return result
+107
View File
@@ -0,0 +1,107 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import pandas as pd
import ray.dataframe as rdf
from ray.dataframe.utils import (
to_pandas,
from_pandas
)
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
@pytest.fixture
def ray_df_equals(ray_df1, ray_df2):
return to_pandas(ray_df1).sort_index().equals(
to_pandas(ray_df2).sort_index()
)
@pytest.fixture
def generate_dfs():
df = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15],
'col5': [0, 0, 0, 0]})
df2 = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11],
'col6': [12, 13, 14, 15],
'col7': [0, 0, 0, 0]})
return df, df2
@pytest.fixture
def test_df_concat():
df, df2 = generate_dfs()
assert(ray_df_equals_pandas(rdf.concat([df, df2]), pd.concat([df, df2])))
def test_ray_concat():
df, df2 = generate_dfs()
ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2)
assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2]),
pd.concat([df, df2])))
def test_ray_concat_on_index():
df, df2 = generate_dfs()
ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2)
assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='index'),
pd.concat([df, df2], axis='index')))
assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis='rows'),
pd.concat([df, df2], axis='rows')))
assert(ray_df_equals_pandas(rdf.concat([ray_df, ray_df2], axis=0),
pd.concat([df, df2], axis=0)))
def test_ray_concat_on_column():
df, df2 = generate_dfs()
ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2)
with pytest.raises(NotImplementedError):
rdf.concat([ray_df, ray_df2], axis=1)
with pytest.raises(NotImplementedError):
rdf.concat([ray_df, ray_df2], axis="columns")
def test_invalid_axis_errors():
df, df2 = generate_dfs()
ray_df, ray_df2 = from_pandas(df, 2), from_pandas(df2, 2)
with pytest.raises(ValueError):
rdf.concat([ray_df, ray_df2], axis=2)
def test_mixed_concat():
df, df2 = generate_dfs()
df3 = df.copy()
mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3]
assert(ray_df_equals_pandas(rdf.concat(mixed_dfs),
pd.concat([df, df2, df3])))
def test_mixed_inner_concat():
df, df2 = generate_dfs()
df3 = df.copy()
mixed_dfs = [from_pandas(df, 2), from_pandas(df2, 2), df3]
with pytest.raises(NotImplementedError):
rdf.concat(mixed_dfs, join="inner")
+3 -2
View File
@@ -5,10 +5,11 @@ from __future__ import print_function
import pytest
import numpy as np
import pandas as pd
import ray.dataframe as rdf
import ray.dataframe.io as io
import os
from ray.dataframe.utils import to_pandas
TEST_PARQUET_FILENAME = 'test.parquet'
TEST_CSV_FILENAME = 'test.csv'
SMALL_ROW_SIZE = 2000
@@ -17,7 +18,7 @@ LARGE_ROW_SIZE = 7e6
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
return to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
@pytest.fixture