From 1fa59f1887aac67868f58bd9f2d2bdf28dc22a03 Mon Sep 17 00:00:00 2001 From: Devin Petersohn Date: Mon, 26 Feb 2018 08:58:15 -0800 Subject: [PATCH] [DataFrame] Adding insert, set_axis, set_index, reset_index and tests (#1603) --- python/ray/dataframe/dataframe.py | 271 +++++++++++++++++++- python/ray/dataframe/test/test_dataframe.py | 143 +++++++++-- 2 files changed, 384 insertions(+), 30 deletions(-) diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a300fd8e9..0db5a4e40 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -3,6 +3,14 @@ from __future__ import division from __future__ import print_function import pandas as pd +from pandas.api.types import is_scalar +from pandas.util._validators import validate_bool_kwarg +from pandas.core.index import _ensure_index_from_sequences +from pandas._libs import lib +from pandas.core.dtypes.cast import maybe_upcast_putmask +from pandas.compat import lzip + +import warnings import numpy as np import ray import itertools @@ -792,7 +800,52 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def insert(self, loc, column, value, allow_duplicates=False): - raise NotImplementedError("Not Yet implemented.") + """Insert column into DataFrame at specified location. + + Args: + loc (int): Insertion index. Must verify 0 <= loc <= len(columns). + column (hashable object): Label of the inserted column. + value (int, Series, or array-like): The values to insert. + allow_duplicates (bool): Whether to allow duplicate column names. + """ + try: + len(value) + except TypeError: + value = [value for _ in range(len(self.index))] + + if len(value) != len(self.index): + raise ValueError( + "Column length provided does not match DataFrame length.") + if loc < 0 or loc > len(self.columns): + raise ValueError( + "Location provided must be higher than 0 and lower than the " + "number of columns.") + if not allow_duplicates and column in self.columns: + raise ValueError( + "Column {} already exists in DataFrame.".format(column)) + + cumulative = np.cumsum(self._lengths) + partitions = [value[cumulative[i-1]:cumulative[i]] + for i in range(len(cumulative)) + if i != 0] + + partitions.insert(0, value[:cumulative[0]]) + + # Because insert is always inplace, we have to create this temp fn. + def _insert(_df, _loc, _column, _part, _allow_duplicates): + _df.insert(_loc, _column, _part, _allow_duplicates) + return _df + + self._df = \ + [_deploy_func.remote(_insert, + self._df[i], + loc, + column, + partitions[i], + allow_duplicates) + for i in range(len(self._df))] + + self.columns = self.columns.insert(loc, column) def interpolate(self, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', downcast=None, **kwargs): @@ -1047,6 +1100,8 @@ class DataFrame(object): popped = to_pandas(self._map_partitions( lambda df: df.pop(item))) self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df + self.columns = [col for col in self.columns if col != item] + return popped def pow(self, other, axis='columns', level=None, fill_value=None): @@ -1111,7 +1166,103 @@ class DataFrame(object): def reset_index(self, level=None, drop=False, inplace=False, col_level=0, col_fill=''): - raise NotImplementedError("Not Yet implemented.") + """Reset this index to default and create column from current index. + + Args: + level: Only remove the given levels from the index. Removes all + levels by default + drop: Do not try to insert index into dataframe columns. This + resets the index to the default integer index. + inplace: Modify the DataFrame in place (do not create a new object) + col_level : If the columns have multiple levels, determines which + level the labels are inserted into. By default it is inserted + into the first level. + col_fill: If the columns have multiple levels, determines how the + other levels are named. If None then the index name is + repeated. + + Returns: + A new DataFrame if inplace is False, None otherwise. + """ + inplace = validate_bool_kwarg(inplace, 'inplace') + if inplace: + new_obj = self + else: + new_obj = self.copy() + + def _maybe_casted_values(index, labels=None): + if isinstance(index, pd.PeriodIndex): + values = index.asobject.values + elif isinstance(index, pd.DatetimeIndex) and index.tz is not None: + values = index + else: + values = index.values + if values.dtype == np.object_: + values = lib.maybe_convert_objects(values) + + # if we have the labels, extract the values with a mask + if labels is not None: + mask = labels == -1 + + # we can have situations where the whole mask is -1, + # meaning there is nothing found in labels, so make all nan's + if mask.all(): + values = np.empty(len(mask)) + values.fill(np.nan) + else: + values = values.take(labels) + if mask.any(): + values, changed = maybe_upcast_putmask( + values, mask, np.nan) + return values + + new_index = new_obj._default_index().index + if level is not None: + if not isinstance(level, (tuple, list)): + level = [level] + level = [self.index._get_level_number(lev) for lev in level] + if isinstance(self.index, pd.MultiIndex): + if len(level) < self.index.nlevels: + new_index = self.index.droplevel(level) + + if not drop: + if isinstance(self.index, pd.MultiIndex): + names = [n if n is not None else ('level_%d' % i) + for (i, n) in enumerate(self.index.names)] + to_insert = lzip(self.index.levels, self.index.labels) + else: + default = 'index' if 'index' not in self else 'level_0' + names = ([default] if self.index.name is None + else [self.index.name]) + to_insert = ((self.index, None),) + + multi_col = isinstance(self.columns, pd.MultiIndex) + for i, (lev, lab) in reversed(list(enumerate(to_insert))): + if not (level is None or i in level): + continue + name = names[i] + if multi_col: + col_name = (list(name) if isinstance(name, tuple) + else [name]) + if col_fill is None: + if len(col_name) not in (1, self.columns.nlevels): + raise ValueError("col_fill=None is incompatible " + "with incomplete column name " + "{}".format(name)) + col_fill = col_name[0] + + lev_num = self.columns._get_level_number(col_level) + name_lst = [col_fill] * lev_num + col_name + missing = self.columns.nlevels - len(name_lst) + name_lst += [col_fill] * missing + name = tuple(name_lst) + # to ndarray and maybe infer different dtype + level_values = _maybe_casted_values(lev, lab) + new_obj.insert(0, name, level_values) + + new_obj.index = new_index + if not inplace: + return new_obj def rfloordiv(self, other, axis='columns', level=None, fill_value=None): raise NotImplementedError("Not Yet implemented.") @@ -1155,11 +1306,116 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def set_axis(self, labels, axis=0, inplace=None): - raise NotImplementedError("Not Yet implemented.") + """Assign desired index to given axis. + + Args: + labels (pd.Index or list-like): The Index to assign. + axis (string or int): The axis to reassign. + inplace (bool): Whether to make these modifications inplace. + + Returns: + If inplace is False, returns a new DataFrame, otherwise None. + """ + if is_scalar(labels): + warnings.warn( + 'set_axis now takes "labels" as first argument, and ' + '"axis" as named parameter. The old form, with "axis" as ' + 'first parameter and \"labels\" as second, is still supported ' + 'but will be deprecated in a future version of pandas.', + FutureWarning, stacklevel=2) + labels, axis = axis, labels + + if inplace is None: + warnings.warn( + 'set_axis currently defaults to operating inplace.\nThis ' + 'will change in a future version of pandas, use ' + 'inplace=True to avoid this warning.', + FutureWarning, stacklevel=2) + inplace = True + if inplace: + setattr(self, self._index._get_axis_name(axis), labels) + else: + obj = self.copy() + obj.set_axis(labels, axis=axis, inplace=True) + return obj def set_index(self, keys, drop=True, append=False, inplace=False, verify_integrity=False): - raise NotImplementedError("Not Yet implemented.") + """Set the DataFrame index using one or more existing columns. + + Args: + keys: column label or list of column labels / arrays. + drop (boolean): Delete columns to be used as the new index. + append (boolean): Whether to append columns to existing index. + inplace (boolean): Modify the DataFrame in place. + verify_integrity (boolean): Check the new index for duplicates. + Otherwise defer the check until necessary. Setting to False + will improve the performance of this method + + Returns: + If inplace is set to false returns a new DataFrame, otherwise None. + """ + inplace = validate_bool_kwarg(inplace, 'inplace') + if not isinstance(keys, list): + keys = [keys] + + if inplace: + frame = self + else: + frame = self.copy() + + arrays = [] + names = [] + if append: + names = [x for x in self.index.names] + if isinstance(self.index, pd.MultiIndex): + for i in range(self.index.nlevels): + arrays.append(self.index._get_level_values(i)) + else: + arrays.append(self.index) + + to_remove = [] + for col in keys: + if isinstance(col, pd.MultiIndex): + # append all but the last column so we don't have to modify + # the end of this loop + for n in range(col.nlevels - 1): + arrays.append(col._get_level_values(n)) + + level = col._get_level_values(col.nlevels - 1) + names.extend(col.names) + elif isinstance(col, pd.Series): + level = col._values + names.append(col.name) + elif isinstance(col, pd.Index): + level = col + names.append(col.name) + elif isinstance(col, (list, np.ndarray, pd.Index)): + level = col + names.append(None) + else: + level = frame[col]._values + names.append(col) + if drop: + to_remove.append(col) + arrays.append(level) + + index = _ensure_index_from_sequences(arrays, names) + + if verify_integrity and not index.is_unique: + duplicates = index.get_duplicates() + raise ValueError('Index has duplicate keys: %s' % duplicates) + + for c in to_remove: + del frame[c] + + # clear up memory usage + index._cleanup() + + frame.index = index + + if not inplace: + return frame def set_value(self, index, col, value, takeable=False): raise NotImplementedError("Not Yet implemented.") @@ -1416,7 +1672,7 @@ class DataFrame(object): raise NotImplementedError("Not Yet implemented.") def __contains__(self, key): - raise NotImplementedError("Not Yet implemented.") + return key in self.columns def __nonzero__(self): raise NotImplementedError("Not Yet implemented.") @@ -1715,4 +1971,7 @@ def to_pandas(df): Returns: A new pandas DataFrame. """ - return pd.concat(ray.get(df._df)) + pd_df = pd.concat(ray.get(df._df)) + pd_df.index = df.index + pd_df.columns = df.columns + return pd_df diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index d49f71fb3..7dd63059b 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -216,6 +216,29 @@ def test_int_dataframe(): test_notna(ray_df, pandas_df) test_notnull(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + labels.append('e') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test___contains__(ray_df, key, True) + test___contains__(ray_df, "Not Exists", False) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_float_dataframe(): @@ -288,6 +311,26 @@ def test_float_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + labels.append('e') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_mixed_dtype_dataframe(): pandas_df = pd.DataFrame({ @@ -365,6 +408,25 @@ def test_mixed_dtype_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_nan_dataframe(): pandas_df = pd.DataFrame({ @@ -435,6 +497,25 @@ def test_nan_dataframe(): test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) + labels = ['a', 'b', 'c', 'd'] + test_set_axis(ray_df, pandas_df, labels, 0) + test_set_axis(ray_df, pandas_df, labels, 'rows') + test_set_axis(ray_df, pandas_df, labels, 1) + test_set_axis(ray_df, pandas_df, labels, 'columns') + + for key in keys: + test_set_index(ray_df, pandas_df, key) + test_set_index(ray_df, pandas_df, key, inplace=True) + + test_reset_index(ray_df, pandas_df) + test_reset_index(ray_df, pandas_df, inplace=True) + + for key in keys: + test_insert(ray_df, pandas_df, 0, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 0, "New Column", pandas_df[key]) + test_insert(ray_df, pandas_df, 1, "New Column", ray_df[key]) + test_insert(ray_df, pandas_df, 4, "New Column", ray_df[key]) + def test_add(): ray_df = create_test_dataframe() @@ -902,11 +983,13 @@ def test_info(): ray_df.info() -def test_insert(): - ray_df = create_test_dataframe() +@pytest.fixture +def test_insert(ray_df, pandas_df, loc, column, value): + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() - with pytest.raises(NotImplementedError): - ray_df.insert(None, None, None) + ray_df_cp.insert(loc, column, value) + pd_df_cp.insert(loc, column, value) def test_interpolate(): @@ -1307,11 +1390,19 @@ def test_resample(): ray_df.resample(None) -def test_reset_index(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.reset_index() +@pytest.fixture +def test_reset_index(ray_df, pandas_df, inplace=False): + if not inplace: + print(rdf.to_pandas(ray_df.reset_index(inplace=inplace)).index) + print(pandas_df.reset_index(inplace=inplace)) + assert rdf.to_pandas(ray_df.reset_index(inplace=inplace)).equals( + pandas_df.reset_index(inplace=inplace)) + else: + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() + ray_df_cp.reset_index(inplace=inplace) + pd_df_cp.reset_index(inplace=inplace) + assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) def test_rfloordiv(): @@ -1397,18 +1488,23 @@ def test_sem(): ray_df.sem() -def test_set_axis(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.set_axis(None) +@pytest.fixture +def test_set_axis(ray_df, pandas_df, label, axis): + assert rdf.to_pandas(ray_df.set_axis(label, axis, inplace=False)).equals( + pandas_df.set_axis(label, axis, inplace=False)) -def test_set_index(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.set_index(None) +@pytest.fixture +def test_set_index(ray_df, pandas_df, keys, inplace=False): + if not inplace: + assert rdf.to_pandas(ray_df.set_index(keys)).equals( + pandas_df.set_index(keys)) + else: + ray_df_cp = ray_df.copy() + pd_df_cp = pandas_df.copy() + ray_df_cp.set_index(keys, inplace=inplace) + pd_df_cp.set_index(keys, inplace=inplace) + assert rdf.to_pandas(ray_df_cp).equals(pd_df_cp) def test_set_value(): @@ -1817,11 +1913,10 @@ def test___iter__(): ray_df.__iter__() -def test___contains__(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.__contains__(None) +@pytest.fixture +def test___contains__(ray_df, key, result): + assert result == ray_df.__contains__(key) + assert result == (key in ray_df) def test___nonzero__():