diff --git a/.editorconfig b/.editorconfig deleted file mode 100644 index 243ac62d6..000000000 --- a/.editorconfig +++ /dev/null @@ -1,11 +0,0 @@ -# top-most EditorConfig file -root = true - -[*.{h,c,cpp,hpp,cc,cxx}] -indent_style = space -indent_size = 2 -indent_brace_style = K&R - -[*.py] -indent_style = space -indent_size = 2 diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 132c53047..a300fd8e9 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -7,8 +7,6 @@ import numpy as np import ray import itertools -from .index import Index - class DataFrame(object): @@ -20,22 +18,21 @@ class DataFrame(object): partitions. columns (pandas.Index): The column names for this dataframe, in pandas Index object. + index (pandas.Index or list): The row index for this dataframe. """ assert(len(df) > 0) self._df = df - # TODO: Clean up later. - # We will call get only when we access the object (and only once). - self._lengths = \ - ray.get([_deploy_func.remote(_get_lengths, d) for d in self._df]) + self._lengths = [_deploy_func.remote(_get_lengths, d) + for d in self._df] self.columns = columns - if index is None: - self._index = self._default_index() - else: - self._index = index + # this _index object is a pd.DataFrame + # and we use that DataFrame's Index to index the rows. + self._index = self._default_index() - self._pd_index = None + if index is not None: + self.index = index def __str__(self): return "ray.DataFrame object" @@ -49,10 +46,7 @@ class DataFrame(object): Returns: The union of all indexes across the partitions. """ - if self._pd_index is None: - self._pd_index = Index.to_pandas(self._index) - - return self._pd_index + return self._index.index def _set_index(self, new_index): """Set the index for this DataFrame. @@ -60,18 +54,47 @@ class DataFrame(object): Args: new_index: The new index to set this """ - self._pd_index = None - self._index = Index.from_pandas(new_index, self._lengths) + self._index.index = new_index def _default_index(self): - dest_indices = [(i, j) - for i in range(len(self._lengths)) - for j in range(self._lengths[i])] - return Index({i: dest_indices[i] for i in range(len(dest_indices))}, - pd.RangeIndex) + """Create a default index, which is a RangeIndex + + Returns: + The pd.RangeIndex object that represents this DataFrame. + """ + dest_indices = {"partition": + [i for i in range(len(self._lengths)) + for j in range(self._lengths[i])], + "index_within_partition": + [j for i in range(len(self._lengths)) + for j in range(self._lengths[i])]} + return pd.DataFrame(dest_indices) index = property(_get_index, _set_index) + def _get_lengths(self): + """Gets the lengths for each partition and caches it if it wasn't. + + Returns: + A list of integers representing the length of each partition. + """ + if isinstance(self._length_cache[0], ray.local_scheduler.ObjectID): + self._length_cache = ray.get(self._length_cache) + return self._length_cache + + def _set_lengths(self, lengths): + """Sets the lengths of each partition for this DataFrame. + + We use this because we can compute it when creating the DataFrame. + + Args: + lengths ([ObjectID or Int]): A list of lengths for each + partition, in order. + """ + self._length_cache = lengths + + _lengths = property(_get_lengths, _set_lengths) + @property def size(self): """Get the number of elements in the DataFrame. @@ -79,8 +102,7 @@ class DataFrame(object): Returns: The number of elements in the DataFrame. """ - sizes = ray.get(self._map_partitions(lambda df: df.size)._df) - return sum(sizes) + return len(self.index) * len(self.columns) @property def ndim(self): @@ -154,7 +176,7 @@ class DataFrame(object): """ return (len(self.index), len(self.columns)) - def _map_partitions(self, func, *args): + def _map_partitions(self, func, index=None): """Apply a function on each partition. Args: @@ -165,8 +187,10 @@ class DataFrame(object): """ assert(callable(func)) new_df = [_deploy_func.remote(func, part) for part in self._df] + if index is None: + index = self.index - return DataFrame(new_df, self.columns, index=self._index) + return DataFrame(new_df, self.columns, index=index) def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -174,9 +198,8 @@ class DataFrame(object): Returns: A new DataFrame containing the new column names. """ - new_dfs = self._map_partitions(lambda df: df.add_prefix(prefix)) new_cols = self.columns.map(lambda x: str(prefix) + str(x)) - return DataFrame(new_dfs._df, new_cols, index=self._index) + return DataFrame(self._df, new_cols, index=self.index) def add_suffix(self, suffix): """Add a suffix to each of the column names. @@ -184,9 +207,8 @@ class DataFrame(object): Returns: A new DataFrame containing the new column names. """ - new_dfs = self._map_partitions(lambda df: df.add_suffix(suffix)) new_cols = self.columns.map(lambda x: str(x) + str(suffix)) - return DataFrame(new_dfs._df, new_cols, index=self._index) + return DataFrame(self._df, new_cols, index=self.index) def applymap(self, func): """Apply a function to a DataFrame elementwise. @@ -203,7 +225,7 @@ class DataFrame(object): Returns: A new DataFrame pointing to the same partitions as this one. """ - return DataFrame(self._df, self.columns, index=self._index) + return DataFrame(self._df, self.columns, index=self.index) def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, group_keys=True, squeeze=False, **kwargs): @@ -221,8 +243,7 @@ class DataFrame(object): A new DataFrame resulting from the groupby. """ - indices = list(set( - [index for df in ray.get(self._df) for index in list(df.index)])) + indices = self.index.unique() chunksize = int(len(indices) / len(self._df)) partitions = [_shuffle.remote(df, indices, chunksize) @@ -238,7 +259,7 @@ class DataFrame(object): shuffle[i].append(partitions[j][i]) new_dfs = [_local_groupby.remote(part, axis=axis) for part in shuffle] - return DataFrame(new_dfs, self.columns) + return DataFrame(new_dfs, self.columns, index=indices) def reduce_by_index(self, func, axis=0): """Perform a reduction based on the row index. @@ -250,7 +271,8 @@ class DataFrame(object): Returns: A new DataFrame with the result of the reduction. """ - return self.groupby(axis=axis)._map_partitions(func) + return self.groupby(axis=axis)._map_partitions( + func, index=pd.unique(self.index)) def sum(self, axis=None, skipna=True, level=None, numeric_only=None): """Perform a sum across the DataFrame. @@ -262,9 +284,14 @@ class DataFrame(object): Returns: The sum of the DataFrame. """ + intermediate_index = [idx + for _ in range(len(self._df)) + for idx in self.columns] + sum_of_partitions = self._map_partitions( lambda df: df.sum(axis=axis, skipna=skipna, level=level, - numeric_only=numeric_only)) + numeric_only=numeric_only), + index=intermediate_index) return sum_of_partitions.reduce_by_index( lambda df: df.sum(axis=axis, skipna=skipna, level=level, @@ -276,6 +303,10 @@ class DataFrame(object): Returns: A new DataFrame with the applied absolute value. """ + for t in self.dtypes: + if np.dtype('O') == t: + # TODO Give a more accurate error to Pandas + raise TypeError("bad operand type for abs():", "str") return self._map_partitions(lambda df: df.abs()) def isin(self, values): @@ -321,7 +352,7 @@ class DataFrame(object): A pandas Index for this DataFrame. """ # Each partition should have the same index, so we'll use 0's - return ray.get(_deploy_func.remote(lambda df: df.keys(), self._df[0])) + return self.columns def transpose(self, *args, **kwargs): """Transpose columns and rows for the DataFrame. @@ -331,8 +362,14 @@ class DataFrame(object): Returns: A new DataFrame transposed from this DataFrame. """ + temp_index = [idx + for _ in range(len(self._df)) + for idx in self.columns] + + temp_columns = self.index local_transpose = self._map_partitions( - lambda df: df.transpose(*args, **kwargs)) + lambda df: df.transpose(*args, **kwargs), index=temp_index) + local_transpose.columns = temp_columns # Sum will collapse the NAs from the groupby return local_transpose.reduce_by_index( @@ -387,17 +424,15 @@ class DataFrame(object): if axis is None or axis == 0: df = self.T axis = 1 - ordered_index = df.columns else: df = self - ordered_index = df.index mapped = df._map_partitions(lambda df: df.all(axis, bool_only, skipna, level, **kwargs)) - return to_pandas(mapped)[ordered_index] + return to_pandas(mapped) def any(self, axis=None, bool_only=None, skipna=None, level=None, **kwargs): @@ -410,17 +445,15 @@ class DataFrame(object): if axis is None or axis == 0: df = self.T axis = 1 - ordered_index = df.columns else: df = self - ordered_index = df.index mapped = df._map_partitions(lambda df: df.any(axis, bool_only, skipna, level, **kwargs)) - return to_pandas(mapped)[ordered_index] + return to_pandas(mapped) def append(self, other, ignore_index=False, verify_integrity=False): raise NotImplementedError("Not Yet implemented.") @@ -514,14 +547,17 @@ class DataFrame(object): def count(self, axis=0, level=None, numeric_only=False): if axis == 1: - original_index = self.index return self.T.count(axis=0, level=level, - numeric_only=numeric_only)[original_index] + numeric_only=numeric_only) else: + temp_index = [idx + for _ in range(len(self._df)) + for idx in self.columns] + return sum(ray.get(self._map_partitions(lambda df: df.count( axis=axis, level=level, numeric_only=numeric_only - ))._df)) + ), index=temp_index)._df)) def cov(self, min_periods=None): raise NotImplementedError("Not Yet implemented.") @@ -677,20 +713,29 @@ class DataFrame(object): Returns: A new dataframe with the first n rows of the dataframe. """ - sizes = ray.get(self._map_partitions(lambda df: df.size)._df) - new_dfs = [] - i = 0 - while n > 0 and i < len(self._df): - if (n - sizes[i]) < 0: - new_dfs.append(_deploy_func.remote(lambda df: df.head(n), - self._df[i])) - break - else: - new_dfs.append(self._df[i]) - n -= sizes[i] - i += 1 + sizes = self._lengths - return DataFrame(new_dfs, self.columns) + if n >= sum(sizes): + return self + + cumulative = np.cumsum(np.array(sizes)) + new_dfs = [self._df[i] + for i in range(len(cumulative)) + if cumulative[i] < n] + + last_index = len(new_dfs) + + # this happens when we only need from the first partition + if last_index == 0: + num_to_transfer = n + else: + num_to_transfer = n - cumulative[last_index - 1] + + new_dfs.append(_deploy_func.remote(lambda df: df.head(num_to_transfer), + self._df[last_index])) + + index = self._index.head(n).index + return DataFrame(new_dfs, self.columns, index=index) def hist(self, data, column=None, by=None, grid=True, xlabelsize=None, xrot=None, ylabelsize=None, yrot=None, ax=None, sharex=False, @@ -708,6 +753,10 @@ class DataFrame(object): A Series with the index for each maximum value for the axis specified. """ + for t in self.dtypes: + if np.dtype('O') == t: + # TODO Give a more accurate error to Pandas + raise TypeError("bad operand type for abs():", "str") if axis == 1: return to_pandas(self._map_partitions( lambda df: df.idxmax(axis=axis, skipna=skipna))) @@ -725,6 +774,10 @@ class DataFrame(object): A Series with the index for each minimum value for the axis specified. """ + for t in self.dtypes: + if np.dtype('O') == t: + # TODO Give a more accurate error to Pandas + raise TypeError("bad operand type for abs():", "str") if axis == 1: return to_pandas(self._map_partitions( lambda df: df.idxmin(axis=axis, skipna=skipna))) @@ -1165,21 +1218,31 @@ class DataFrame(object): Returns: A new dataframe with the last n rows of this dataframe. """ - sizes = ray.get(self._map_partitions(lambda df: df.size)._df) - new_dfs = [] - i = len(self._df) - 1 - while n > 0 and i >= 0: - if (n - sizes[i]) < 0: - new_dfs.append(_deploy_func.remote(lambda df: df.head(n), - self._df[i])) - break - else: - new_dfs.append(self._df[i]) - n -= sizes[i] - i -= 1 - # we were adding in reverse order, so make it right. - new_dfs.reverse() - return DataFrame(new_dfs, self.columns) + sizes = self._lengths + + if n >= sum(sizes): + return self + + cumulative = np.cumsum(np.array(sizes.reverse())) + + reverse_dfs = self._df.reverse() + new_dfs = [reverse_dfs[i] + for i in range(len(cumulative)) + if cumulative[i] < n] + + last_index = len(new_dfs) + + # this happens when we only need from the last partition + if last_index == 0: + num_to_transfer = n + else: + num_to_transfer = n - cumulative[last_index - 1] + + new_dfs.append(_deploy_func.remote(lambda df: df.tail(num_to_transfer), + reverse_dfs[last_index])).reverse() + + index = self._index.tail(n).index + return DataFrame(new_dfs, self.columns, index=index) def take(self, indices, axis=0, convert=None, is_copy=True, **kwargs): raise NotImplementedError("Not Yet implemented.") @@ -1623,27 +1686,24 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True): elif chunksize is None: raise ValueError("The number of partitions or chunksize must be set.") - old_index = df.index + temp_df = df - # TODO stop reassigning df dataframes = [] lengths = [] - while len(df) > chunksize: - t_df = df[:chunksize] + while len(temp_df) > chunksize: + t_df = temp_df[:chunksize] lengths.append(len(t_df)) # reindex here because we want a pd.RangeIndex within the partitions. # It is smaller and sometimes faster. t_df.reindex() top = ray.put(t_df) dataframes.append(top) - df = df[chunksize:] + temp_df = temp_df[chunksize:] else: - dataframes.append(ray.put(df)) - lengths.append(len(df)) + dataframes.append(ray.put(temp_df)) + lengths.append(len(temp_df)) - ray_index = Index.from_pandas(old_index, lengths) - - return DataFrame(dataframes, df.columns, index=ray_index) + return DataFrame(dataframes, df.columns, index=df.index) def to_pandas(df): diff --git a/python/ray/dataframe/index.py b/python/ray/dataframe/index.py deleted file mode 100644 index e42426a6c..000000000 --- a/python/ray/dataframe/index.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import pandas as pd - - -class Index(object): - - def __init__(self, idx, pandas_type): - self.idx = idx - self.pandas_type = pandas_type - - def __getitem__(self, item): - return self.idx[item] - - def __len__(self): - return len(self.idx) - - @classmethod - def to_pandas(cls, index): - """Convert a Ray Index object to a Pandas Index object. - - Args: - index (ray.Index): A Ray Index object. - - Returns: - A pandas Index object. - """ - k = index.idx.keys() - if index.pandas_type is pd.RangeIndex: - return pd.RangeIndex(min(k), max(k) + 1) - else: - return pd.Index(k) - - @classmethod - def from_pandas(cls, pd_index, lengths): - """Convert a Pandas Index object to a Ray Index object. - - Args: - pd_index (pd.Index): A Pandas Index object. - lengths ([int]): A list of lengths for the partitions. - - Returns: - A Ray Index object. - """ - dest_indices = [(i, j) - for i in range(len(lengths)) - for j in range(lengths[i])] - if len(pd_index) != len(dest_indices): - raise ValueError( - "Length of index given does not match current dataframe") - - return Index( - {pd_index[i]: dest_indices[i] for i in range(len(dest_indices))}, - type(pd_index)) diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index c3c9af0d4..d49f71fb3 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -194,6 +194,7 @@ def test_int_dataframe(): test___deepcopy__(ray_df, pandas_df) test_bool(ray_df, pandas_df) test_count(ray_df, pandas_df) + test_head(ray_df, pandas_df, 2) test_head(ray_df, pandas_df) test_tail(ray_df, pandas_df) test_idxmax(ray_df, pandas_df) @@ -224,7 +225,7 @@ def test_float_dataframe(): 'col4': [12.0, 13.0, 14.0, 15.0], 'col5': [0.0, 0.0, 0.0, 0.0]}) - ray_df = rdf.from_pandas(pandas_df, 2) + ray_df = rdf.from_pandas(pandas_df, 3) testfuncs = [lambda x: x + 1, lambda x: str(x), @@ -266,6 +267,7 @@ def test_float_dataframe(): test___deepcopy__(ray_df, pandas_df) test_bool(ray_df, pandas_df) test_count(ray_df, pandas_df) + test_head(ray_df, pandas_df, 3) test_head(ray_df, pandas_df) test_tail(ray_df, pandas_df) test_idxmax(ray_df, pandas_df) @@ -322,25 +324,47 @@ def test_mixed_dtype_dataframe(): test_copy(ray_df) test_sum(ray_df, pandas_df) + + with pytest.raises(TypeError): + test_abs(ray_df, pandas_df) + test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) + test_round(ray_df, pandas_df) + + test_all(ray_df, pandas_df) + test_any(ray_df, pandas_df) + test___getitem__(ray_df, pandas_df) + test___delitem__(ray_df, pandas_df) + test___copy__(ray_df, pandas_df) + test___deepcopy__(ray_df, pandas_df) + test_bool(ray_df, pandas_df) + test_count(ray_df, pandas_df) + test_head(ray_df, pandas_df, 2) + test_head(ray_df, pandas_df) + test_tail(ray_df, pandas_df) + + with pytest.raises(TypeError): + test_idxmax(ray_df, pandas_df) + with pytest.raises(TypeError): + test_idxmin(ray_df, pandas_df) + + test_pop(ray_df, pandas_df) + test_max(ray_df, pandas_df) + test_min(ray_df, pandas_df) + test_notna(ray_df, pandas_df) + test_notnull(ray_df, pandas_df) for key in keys: test_get(ray_df, pandas_df, key) test_get_dtype_counts(ray_df, pandas_df) test_get_ftype_counts(ray_df, pandas_df) - test_items(ray_df, pandas_df) test_iterrows(ray_df, pandas_df) test_items(ray_df, pandas_df) test_iteritems(ray_df, pandas_df) test_itertuples(ray_df, pandas_df) - test_max(ray_df, pandas_df) - test_min(ray_df, pandas_df) - test_notna(ray_df, pandas_df) - test_notnull(ray_df, pandas_df) - def test_nan_dataframe(): pandas_df = pd.DataFrame({ @@ -377,14 +401,39 @@ def test_nan_dataframe(): test_copy(ray_df) test_sum(ray_df, pandas_df) + test_abs(ray_df, pandas_df) test_keys(ray_df, pandas_df) test_transpose(ray_df, pandas_df) + test_round(ray_df, pandas_df) + + test_all(ray_df, pandas_df) + test_any(ray_df, pandas_df) + test___getitem__(ray_df, pandas_df) + test___delitem__(ray_df, pandas_df) + test___copy__(ray_df, pandas_df) + test___deepcopy__(ray_df, pandas_df) + test_bool(ray_df, pandas_df) + test_count(ray_df, pandas_df) + test_head(ray_df, pandas_df, 2) + test_head(ray_df, pandas_df) + test_tail(ray_df, pandas_df) + test_idxmax(ray_df, pandas_df) + test_idxmin(ray_df, pandas_df) + test_pop(ray_df, pandas_df) + test_max(ray_df, pandas_df) + test_min(ray_df, pandas_df) + test_notna(ray_df, pandas_df) + test_notnull(ray_df, pandas_df) for key in keys: test_get(ray_df, pandas_df, key) test_get_dtype_counts(ray_df, pandas_df) test_get_ftype_counts(ray_df, pandas_df) + test_iterrows(ray_df, pandas_df) + test_items(ray_df, pandas_df) + test_iteritems(ray_df, pandas_df) + test_itertuples(ray_df, pandas_df) def test_add(): @@ -816,8 +865,8 @@ def test_gt(): @pytest.fixture -def test_head(ray_df, pandas_df): - ray_df_equals_pandas(ray_df.head(), pandas_df.head()) +def test_head(ray_df, pandas_df, n=5): + ray_df_equals_pandas(ray_df.head(n), pandas_df.head(n)) def test_hist(): @@ -906,7 +955,7 @@ def test_itertuples(ray_df, pandas_df): ray_it_default = ray_df.itertuples() pandas_it_default = pandas_df.itertuples() for ray_row, pandas_row in zip(ray_it_default, pandas_it_default): - assert ray_row == pandas_row + np.testing.assert_equal(ray_row, pandas_row) # test all combinations of custom params indices = [True, False] @@ -917,7 +966,7 @@ def test_itertuples(ray_df, pandas_df): ray_it_custom = ray_df.itertuples(index=index, name=name) pandas_it_custom = pandas_df.itertuples(index=index, name=name) for ray_row, pandas_row in zip(ray_it_custom, pandas_it_custom): - assert ray_row == pandas_row + np.testing.assert_equal(ray_row, pandas_row) def test_join():