[DataFrame] Sample implement (#1954)

* implemented sample - need to test

* sample fully working

* added sanity check tests

* added some comments to clarify the _deploy_func call

* some more clarifying comments

* added explanatory comments

* minor change in weights_sum for sample
This commit is contained in:
Omkar Salpekar
2018-04-30 10:42:28 -07:00
committed by Devin Petersohn
parent 0c477fbbca
commit 1231aa0582
4 changed files with 168 additions and 12 deletions
+3 -2
View File
@@ -3,7 +3,7 @@ from __future__ import division
from __future__ import print_function
import pandas as pd
from pandas import eval
from pandas import (eval, Panel, date_range, MultiIndex)
import threading
pd_version = pd.__version__
@@ -37,7 +37,8 @@ from .io import (read_csv, read_parquet, read_json, read_html, # noqa: 402
from .concat import concat # noqa: 402
__all__ = [
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval"
"DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval",
"Panel", "date_range", "MultiIndex"
]
try:
+161 -4
View File
@@ -9,7 +9,7 @@ from pandas.core.index import _ensure_index_from_sequences
from pandas._libs import lib
from pandas.core.dtypes.cast import maybe_upcast_putmask
from pandas import compat
from pandas.compat import lzip, cPickle as pkl
from pandas.compat import lzip, string_types, cPickle as pkl
import pandas.core.common as com
from pandas.core.dtypes.common import (
is_bool_dtype,
@@ -2959,9 +2959,166 @@ class DataFrame(object):
def sample(self, n=None, frac=None, replace=False, weights=None,
random_state=None, axis=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Returns a random sample of items from an axis of object.
Args:
n: Number of items from axis to return. Cannot be used with frac.
Default = 1 if frac = None.
frac: Fraction of axis items to return. Cannot be used with n.
replace: Sample with or without replacement. Default = False.
weights: Default None results in equal probability weighting.
If passed a Series, will align with target object on index.
Index values in weights not found in sampled object will be
ignored and index values in sampled object not in weights will
be assigned weights of zero. If called on a DataFrame, will
accept the name of a column when axis = 0. Unless weights are
a Series, weights must be same length as axis being sampled.
If weights do not sum to 1, they will be normalized to sum
to 1. Missing values in the weights column will be treated as
zero. inf and -inf values not allowed.
random_state: Seed for the random number generator (if int), or
numpy RandomState object.
axis: Axis to sample. Accepts axis number or name.
Returns:
A new Dataframe
"""
axis = pd.DataFrame()._get_axis_number(axis) if axis is not None \
else 0
if axis == 0:
axis_length = len(self._row_metadata)
else:
axis_length = len(self._col_metadata)
if weights is not None:
# Index of the weights Series should correspond to the index of the
# Dataframe in order to sample
if isinstance(weights, pd.Series):
weights = weights.reindex(self.axes[axis])
# If weights arg is a string, the weights used for sampling will
# the be values in the column corresponding to that string
if isinstance(weights, string_types):
if axis == 0:
try:
weights = self[weights]
except KeyError:
raise KeyError("String passed to weights not a "
"valid column")
else:
raise ValueError("Strings can only be passed to "
"weights when sampling from rows on "
"a DataFrame")
weights = pd.Series(weights, dtype='float64')
if len(weights) != axis_length:
raise ValueError("Weights and axis to be sampled must be of "
"same length")
if (weights == np.inf).any() or (weights == -np.inf).any():
raise ValueError("weight vector may not include `inf` values")
if (weights < 0).any():
raise ValueError("weight vector many not include negative "
"values")
# weights cannot be NaN when sampling, so we must set all nan
# values to 0
weights = weights.fillna(0)
# If passed in weights are not equal to 1, renormalize them
# otherwise numpy sampling function will error
weights_sum = weights.sum()
if weights_sum != 1:
if weights_sum != 0:
weights = weights / weights_sum
else:
raise ValueError("Invalid weights: weights sum to zero")
weights = weights.values
if n is None and frac is None:
# default to n = 1 if n and frac are both None (in accordance with
# Pandas specification)
n = 1
elif n is not None and frac is None and n % 1 != 0:
# n must be an integer
raise ValueError("Only integers accepted as `n` values")
elif n is None and frac is not None:
# compute the number of samples based on frac
n = int(round(frac * axis_length))
elif n is not None and frac is not None:
# Pandas specification does not allow both n and frac to be passed
# in
raise ValueError('Please enter a value for `frac` OR `n`, not '
'both')
if n < 0:
raise ValueError("A negative number of rows requested. Please "
"provide positive value.")
if n == 0:
# An Empty DataFrame is returned if the number of samples is 0.
# The Empty Dataframe should have either columns or index specified
# depending on which axis is passed in.
return DataFrame(columns=[] if axis == 1 else self.columns,
index=self.index if axis == 1 else [])
if axis == 1:
axis_labels = self.columns
partition_metadata = self._col_metadata
partitions = self._col_partitions
else:
axis_labels = self.index
partition_metadata = self._row_metadata
partitions = self._row_partitions
if random_state is not None:
# Get a random number generator depending on the type of
# random_state that is passed in
if isinstance(random_state, int):
random_num_gen = np.random.RandomState(random_state)
elif isinstance(random_state, np.random.randomState):
random_num_gen = random_state
else:
# random_state must be an int or a numpy RandomState object
raise ValueError("Please enter an `int` OR a "
"np.random.RandomState for random_state")
# choose random numbers and then get corresponding labels from
# chosen axis
sample_indices = random_num_gen.randint(
low=0,
high=len(partition_metadata),
size=n)
samples = axis_labels[sample_indices]
else:
# randomly select labels from chosen axis
samples = np.random.choice(a=axis_labels, size=n,
replace=replace, p=weights)
# create an array of (partition, index_within_partition) tuples for
# each sample
part_ind_tuples = [partition_metadata[sample]
for sample in samples]
if axis == 1:
# tup[0] refers to the partition number and tup[1] is the index
# within that partition
new_cols = [_deploy_func.remote(lambda df: df.iloc[:, [tup[1]]],
partitions[tup[0]]) for tup in part_ind_tuples]
return DataFrame(col_partitions=new_cols,
columns=samples,
index=self.index)
else:
new_rows = [_deploy_func.remote(lambda df: df.loc[[tup[1]]],
partitions[tup[0]]) for tup in part_ind_tuples]
return DataFrame(row_partitions=new_rows,
columns=self.columns,
index=samples)
def select(self, crit, axis=0):
raise NotImplementedError(
+2 -3
View File
@@ -2546,9 +2546,8 @@ def test_rtruediv():
def test_sample():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.sample()
assert len(ray_df.sample(n=4)) == 4
assert len(ray_df.sample(frac=0.5)) == 2
def test_select():
+2 -3
View File
@@ -161,8 +161,7 @@ def _map_partitions(func, partitions, *argslists):
def _build_columns(df_col, columns):
"""Build columns and compute lengths for each partition."""
# Columns and width
widths = np.array(ray.get([_deploy_func.remote(lambda df: len(df.columns),
d)
widths = np.array(ray.get([_deploy_func.remote(_get_widths, d)
for d in df_col]))
dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(widths))
for p_sub_idx in range(widths[p_idx])]
@@ -190,7 +189,7 @@ def _build_index(df_row, index):
def _create_block_partitions(partitions, axis=0, length=None):
if length is not None and get_npartitions() > length:
if length is not None and length != 0 and get_npartitions() > length:
npartitions = length
else:
npartitions = get_npartitions()