From 1231aa0582298e0e380b634f836cb9360fd424b9 Mon Sep 17 00:00:00 2001 From: Omkar Salpekar Date: Mon, 30 Apr 2018 10:42:28 -0700 Subject: [PATCH] [DataFrame] Sample implement (#1954) * implemented sample - need to test * sample fully working * added sanity check tests * added some comments to clarify the _deploy_func call * some more clarifying comments * added explanatory comments * minor change in weights_sum for sample --- python/ray/dataframe/__init__.py | 5 +- python/ray/dataframe/dataframe.py | 165 +++++++++++++++++++- python/ray/dataframe/test/test_dataframe.py | 5 +- python/ray/dataframe/utils.py | 5 +- 4 files changed, 168 insertions(+), 12 deletions(-) diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 5ad6b85e8..7eea37f99 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -3,7 +3,7 @@ from __future__ import division from __future__ import print_function import pandas as pd -from pandas import eval +from pandas import (eval, Panel, date_range, MultiIndex) import threading pd_version = pd.__version__ @@ -37,7 +37,8 @@ from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402 from .concat import concat # noqa: 402 __all__ = [ - "DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval" + "DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval", + "Panel", "date_range", "MultiIndex" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 4b67e5266..b96c4c836 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -9,7 +9,7 @@ from pandas.core.index import _ensure_index_from_sequences from pandas._libs import lib from pandas.core.dtypes.cast import maybe_upcast_putmask from pandas import compat -from pandas.compat import lzip, cPickle as pkl +from pandas.compat import lzip, string_types, cPickle as pkl import pandas.core.common as com from pandas.core.dtypes.common import ( is_bool_dtype, @@ -2959,9 +2959,166 @@ class DataFrame(object): def sample(self, n=None, frac=None, replace=False, weights=None, random_state=None, axis=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Returns a random sample of items from an axis of object. + + Args: + n: Number of items from axis to return. Cannot be used with frac. + Default = 1 if frac = None. + frac: Fraction of axis items to return. Cannot be used with n. + replace: Sample with or without replacement. Default = False. + weights: Default ‘None’ results in equal probability weighting. + If passed a Series, will align with target object on index. + Index values in weights not found in sampled object will be + ignored and index values in sampled object not in weights will + be assigned weights of zero. If called on a DataFrame, will + accept the name of a column when axis = 0. Unless weights are + a Series, weights must be same length as axis being sampled. + If weights do not sum to 1, they will be normalized to sum + to 1. Missing values in the weights column will be treated as + zero. inf and -inf values not allowed. + random_state: Seed for the random number generator (if int), or + numpy RandomState object. + axis: Axis to sample. Accepts axis number or name. + + Returns: + A new Dataframe + """ + + axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \ + else 0 + + if axis == 0: + axis_length = len(self._row_metadata) + else: + axis_length = len(self._col_metadata) + + if weights is not None: + + # Index of the weights Series should correspond to the index of the + # Dataframe in order to sample + if isinstance(weights, pd.Series): + weights = weights.reindex(self.axes[axis]) + + # If weights arg is a string, the weights used for sampling will + # the be values in the column corresponding to that string + if isinstance(weights, string_types): + if axis == 0: + try: + weights = self[weights] + except KeyError: + raise KeyError("String passed to weights not a " + "valid column") + else: + raise ValueError("Strings can only be passed to " + "weights when sampling from rows on " + "a DataFrame") + + weights = pd.Series(weights, dtype='float64') + + if len(weights) != axis_length: + raise ValueError("Weights and axis to be sampled must be of " + "same length") + + if (weights == np.inf).any() or (weights == -np.inf).any(): + raise ValueError("weight vector may not include `inf` values") + + if (weights < 0).any(): + raise ValueError("weight vector many not include negative " + "values") + + # weights cannot be NaN when sampling, so we must set all nan + # values to 0 + weights = weights.fillna(0) + + # If passed in weights are not equal to 1, renormalize them + # otherwise numpy sampling function will error + weights_sum = weights.sum() + if weights_sum != 1: + if weights_sum != 0: + weights = weights / weights_sum + else: + raise ValueError("Invalid weights: weights sum to zero") + + weights = weights.values + + if n is None and frac is None: + # default to n = 1 if n and frac are both None (in accordance with + # Pandas specification) + n = 1 + elif n is not None and frac is None and n % 1 != 0: + # n must be an integer + raise ValueError("Only integers accepted as `n` values") + elif n is None and frac is not None: + # compute the number of samples based on frac + n = int(round(frac * axis_length)) + elif n is not None and frac is not None: + # Pandas specification does not allow both n and frac to be passed + # in + raise ValueError('Please enter a value for `frac` OR `n`, not ' + 'both') + if n < 0: + raise ValueError("A negative number of rows requested. Please " + "provide positive value.") + + if n == 0: + # An Empty DataFrame is returned if the number of samples is 0. + # The Empty Dataframe should have either columns or index specified + # depending on which axis is passed in. + return DataFrame(columns=[] if axis == 1 else self.columns, + index=self.index if axis == 1 else []) + + if axis == 1: + axis_labels = self.columns + partition_metadata = self._col_metadata + partitions = self._col_partitions + else: + axis_labels = self.index + partition_metadata = self._row_metadata + partitions = self._row_partitions + + if random_state is not None: + # Get a random number generator depending on the type of + # random_state that is passed in + if isinstance(random_state, int): + random_num_gen = np.random.RandomState(random_state) + elif isinstance(random_state, np.random.randomState): + random_num_gen = random_state + else: + # random_state must be an int or a numpy RandomState object + raise ValueError("Please enter an `int` OR a " + "np.random.RandomState for random_state") + + # choose random numbers and then get corresponding labels from + # chosen axis + sample_indices = random_num_gen.randint( + low=0, + high=len(partition_metadata), + size=n) + samples = axis_labels[sample_indices] + else: + # randomly select labels from chosen axis + samples = np.random.choice(a=axis_labels, size=n, + replace=replace, p=weights) + + # create an array of (partition, index_within_partition) tuples for + # each sample + part_ind_tuples = [partition_metadata[sample] + for sample in samples] + + if axis == 1: + # tup[0] refers to the partition number and tup[1] is the index + # within that partition + new_cols = [_deploy_func.remote(lambda df: df.iloc[:, [tup[1]]], + partitions[tup[0]]) for tup in part_ind_tuples] + return DataFrame(col_partitions=new_cols, + columns=samples, + index=self.index) + else: + new_rows = [_deploy_func.remote(lambda df: df.loc[[tup[1]]], + partitions[tup[0]]) for tup in part_ind_tuples] + return DataFrame(row_partitions=new_rows, + columns=self.columns, + index=samples) def select(self, crit, axis=0): raise NotImplementedError( diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index e84ddea41..60d2862d9 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -2546,9 +2546,8 @@ def test_rtruediv(): def test_sample(): ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.sample() + assert len(ray_df.sample(n=4)) == 4 + assert len(ray_df.sample(frac=0.5)) == 2 def test_select(): diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 22cf7eae4..97c166d09 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -161,8 +161,7 @@ def _map_partitions(func, partitions, *argslists): def _build_columns(df_col, columns): """Build columns and compute lengths for each partition.""" # Columns and width - widths = np.array(ray.get([_deploy_func.remote(lambda df: len(df.columns), - d) + widths = np.array(ray.get([_deploy_func.remote(_get_widths, d) for d in df_col])) dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(widths)) for p_sub_idx in range(widths[p_idx])] @@ -190,7 +189,7 @@ def _build_index(df_row, index): def _create_block_partitions(partitions, axis=0, length=None): - if length is not None and get_npartitions() > length: + if length is not None and length != 0 and get_npartitions() > length: npartitions = length else: npartitions = get_npartitions()