[DataFrame] Implements mode, to_datetime, and get_dummies (#1956)

* implement mode and fix getitem

* mode broken on misaligned partitions

* fully implement mode

* implement to_datetime

* implement get_dummies

* implement tests

* fix __getitem__

* fix python2 compatibility

* fix getitem bug

* resolving comments

* Adding documentation

* resolving comment

* resolve name change

* speeding up getitem

* complete rebase
This commit is contained in:
Kunal Gosar
2018-05-02 23:21:00 -07:00
committed by Devin Petersohn
parent d67b786291
commit d85ee0bc04
5 changed files with 260 additions and 13 deletions
+5 -2
View File
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import pandas as pd
from pandas import (eval, Panel, date_range, MultiIndex)
from pandas import (eval, unique, value_counts, Panel, date_range, MultiIndex)
import threading
pd_version = pd.__version__
@@ -35,10 +35,13 @@ from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
read_msgpack, read_stata, read_sas, read_pickle, # noqa: 402
read_sql) # noqa: 402
from .concat import concat # noqa: 402
from .datetimes import to_datetime # noqa: 402
from .reshape import get_dummies # noqa: 402
__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"Panel", "date_range", "MultiIndex"
"unique", "value_counts", "to_datetime", "get_dummies", "Panel",
"date_range", "MultiIndex"
]
try:
+39 -6
View File
@@ -2620,9 +2620,44 @@ class DataFrame(object):
fill_value)
def mode(self, axis=0, numeric_only=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Perform mode across the DataFrame.
Args:
axis (int): The axis to take the mode on.
numeric_only (bool): if True, only apply to numeric columns.
Returns:
DataFrame: The mode of the DataFrame.
"""
axis = pd.DataFrame()._get_axis_number(axis)
def mode_helper(df):
mode_df = df.mode(axis=axis, numeric_only=numeric_only)
return mode_df, mode_df.shape[axis]
def fix_length(df, *lengths):
max_len = max(lengths[0])
df = df.reindex(pd.RangeIndex(max_len), axis=axis)
return df
parts = self._col_partitions if axis == 0 else self._row_partitions
result = [_deploy_func._submit(args=(lambda df: mode_helper(df),
part), num_return_vals=2)
for part in parts]
parts, lengths = [list(t) for t in zip(*result)]
parts = [_deploy_func.remote(
lambda df, *l: fix_length(df, l), part, *lengths)
for part in parts]
if axis == 0:
return DataFrame(col_partitions=parts,
columns=self.columns)
else:
return DataFrame(row_partitions=parts,
index=self.index)
def mul(self, other, axis='columns', level=None, fill_value=None):
"""Multiplies this DataFrame against another DataFrame/Series/scalar.
@@ -3922,9 +3957,7 @@ class DataFrame(object):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = [self.columns.index(new_col)
for new_col in columns]
indices_for_rows = [col for col in self.col if col in set(columns)]
new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
+64
View File
@@ -0,0 +1,64 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pandas
import ray
from .dataframe import DataFrame
from .utils import _map_partitions
def to_datetime(arg, errors='raise', dayfirst=False, yearfirst=False, utc=None,
box=True, format=None, exact=True, unit=None,
infer_datetime_format=False, origin='unix'):
"""Convert the arg to datetime format. If not Ray DataFrame, this falls
back on pandas.
Args:
errors ('raise' or 'ignore'): If 'ignore', errors are silenced.
dayfirst (bool): Date format is passed in as day first.
yearfirst (bool): Date format is passed in as year first.
utc (bool): retuns a UTC DatetimeIndex if True.
box (bool): If True, returns a DatetimeIndex.
format (string): strftime to parse time, eg "%d/%m/%Y".
exact (bool): If True, require an exact format match.
unit (string, default 'ns'): unit of the arg.
infer_datetime_format (bool): Whether or not to infer the format.
origin (string): Define the reference date.
Returns:
Type depends on input:
- list-like: DatetimeIndex
- Series: Series of datetime64 dtype
- scalar: Timestamp
"""
if not isinstance(arg, DataFrame):
return pandas.to_datetime(arg, errors=errors, dayfirst=dayfirst,
yearfirst=yearfirst, utc=utc, box=box,
format=format, exact=exact, unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)
if errors == 'raise':
pandas.to_datetime(pandas.DataFrame(columns=arg.columns),
errors=errors, dayfirst=dayfirst,
yearfirst=yearfirst, utc=utc, box=box,
format=format, exact=exact, unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)
def datetime_helper(df, cols):
df.columns = cols
return pandas.to_datetime(df, errors=errors, dayfirst=dayfirst,
yearfirst=yearfirst, utc=utc, box=box,
format=format, exact=exact, unit=unit,
infer_datetime_format=infer_datetime_format,
origin=origin)
datetime_series = _map_partitions(datetime_helper, arg._row_partitions,
arg.columns)
result = pandas.concat(ray.get(datetime_series), copy=False)
result.index = arg.index
return result
+125
View File
@@ -0,0 +1,125 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import ray
import pandas
import numpy as np
from pandas import compat
from pandas.core.dtypes.common import is_list_like
from itertools import cycle
from .dataframe import DataFrame
from .utils import _deploy_func
def get_dummies(data, prefix=None, prefix_sep='_', dummy_na=False,
columns=None, sparse=False, drop_first=False):
"""Convert categorical variable into indicator variables.
Args:
data (array-like, Series, or DataFrame): data to encode.
prefix (string, [string]): Prefix to apply to each encoded column
label.
prefix_sep (string, [string]): Separator between prefix and value.
dummy_na (bool): Add a column to indicate NaNs.
columns: Which columns to encode.
sparse (bool): Not Implemented: If True, returns SparseDataFrame.
drop_first (bool): Whether to remove the first level of encoded data.
Returns:
DataFrame or one-hot encoded data.
"""
if not isinstance(data, DataFrame):
return pandas.get_dummies(data, prefix=prefix, prefix_sep=prefix_sep,
dummy_na=dummy_na, columns=columns,
sparse=sparse, drop_first=drop_first)
if sparse:
raise NotImplementedError(
"SparseDataFrame is not implemented. "
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
if columns is None:
columns_to_encode = data.dtypes.isin([np.dtype("O"), 'category'])
columns_to_encode = data.columns[columns_to_encode]
else:
columns_to_encode = columns
def check_len(item, name):
len_msg = ("Length of '{name}' ({len_item}) did not match the "
"length of the columns being encoded ({len_enc}).")
if is_list_like(item):
if not len(item) == len(columns_to_encode):
len_msg = len_msg.format(name=name, len_item=len(item),
len_enc=len(columns_to_encode))
raise ValueError(len_msg)
check_len(prefix, 'prefix')
check_len(prefix_sep, 'prefix_sep')
if isinstance(prefix, compat.string_types):
prefix = cycle([prefix])
prefix = [next(prefix) for i in range(len(columns_to_encode))]
if isinstance(prefix, dict):
prefix = [prefix[col] for col in columns_to_encode]
if prefix is None:
prefix = columns_to_encode
# validate separators
if isinstance(prefix_sep, compat.string_types):
prefix_sep = cycle([prefix_sep])
prefix_sep = [next(prefix_sep) for i in range(len(columns_to_encode))]
elif isinstance(prefix_sep, dict):
prefix_sep = [prefix_sep[col] for col in columns_to_encode]
if set(columns_to_encode) == set(data.columns):
with_dummies = []
dropped_columns = pandas.Index()
else:
with_dummies = data.drop(columns_to_encode, axis=1)._col_partitions
dropped_columns = data.columns.drop(columns_to_encode)
def get_dummies_remote(df, to_drop, prefix, prefix_sep):
df = df.drop(to_drop, axis=1)
if df.size == 0:
return df, df.columns
df = pandas.get_dummies(df, prefix=prefix, prefix_sep=prefix_sep,
dummy_na=dummy_na, columns=None, sparse=sparse,
drop_first=drop_first)
columns = df.columns
df.columns = pandas.RangeIndex(0, len(df.columns))
return df, columns
total = 0
columns = []
for i, part in enumerate(data._col_partitions):
col_index = data._col_metadata.partition_series(i)
# TODO(kunalgosar): Handle the case of duplicate columns here
to_encode = col_index.index.isin(columns_to_encode)
to_encode = col_index[to_encode]
to_drop = col_index.drop(to_encode.index)
result = _deploy_func._submit(
args=(get_dummies_remote, part, to_drop,
prefix[total:total + len(to_encode)],
prefix_sep[total:total + len(to_encode)]),
num_return_vals=2)
with_dummies.append(result[0])
columns.append(result[1])
total += len(to_encode)
columns = ray.get(columns)
dropped_columns = dropped_columns.append(columns)
return DataFrame(col_partitions=with_dummies,
columns=dropped_columns,
index=data.index)
+27 -5
View File
@@ -2106,11 +2106,11 @@ def test_mod():
test_inter_df_math("mod", simple=False)
def test_mode():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.mode()
@pytest.fixture
def test_mode(ray_df, pandas_df):
assert(ray_series_equals_pandas(ray_df.mode(), pandas_df.mode()))
assert(ray_series_equals_pandas(ray_df.mode(axis=1),
pandas_df.mode(axis=1)))
def test_mul():
@@ -3105,3 +3105,25 @@ def test__doc__():
pd_obj = getattr(pd.DataFrame, attr, None)
if callable(pd_obj) or isinstance(pd_obj, property):
assert obj.__doc__ == pd_obj.__doc__
def test_to_datetime():
ray_df = rdf.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})
pd_df = pd.DataFrame({'year': [2015, 2016],
'month': [2, 3],
'day': [4, 5]})
rdf.to_datetime(ray_df).equals(pd.to_datetime(pd_df))
def test_get_dummies():
ray_df = rdf.DataFrame({'A': ['a', 'b', 'a'],
'B': ['b', 'a', 'c'],
'C': [1, 2, 3]})
pd_df = pd.DataFrame({'A': ['a', 'b', 'a'],
'B': ['b', 'a', 'c'],
'C': [1, 2, 3]})
ray_df_equals_pandas(rdf.get_dummies(ray_df), pd.get_dummies(pd_df))