diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index bbd219542..c0e09d8ef 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -21,6 +21,9 @@ import warnings import numpy as np import ray import itertools +import io +import sys +import re from .utils import ( _deploy_func, @@ -85,6 +88,7 @@ class DataFrame(object): axis = 0 columns = pd_df.columns index = pd_df.index + self._row_metadata = self._col_metadata = None else: # created this invariant to make sure we never have to go into the # partitions to get the columns @@ -158,13 +162,16 @@ class DataFrame(object): def __str__(self): return repr(self) - def __repr__(self): - if len(self._row_metadata) < 60: - result = repr(to_pandas(self)) - return result + def _repr_helper_(self): + if len(self._row_metadata) <= 60 and \ + len(self._col_metadata) <= 20: + return to_pandas(self) - def head(df, n): + def head(df, n, get_local_head=False): """Compute the head for this without creating a new DataFrame""" + if get_local_head: + return df.head(n) + new_dfs = _map_partitions(lambda df: df.head(n), df) @@ -174,8 +181,10 @@ class DataFrame(object): pd_head.columns = self.columns return pd_head - def tail(df, n): + def tail(df, n, get_local_tail=False): """Compute the tail for this without creating a new DataFrame""" + if get_local_tail: + return df.tail(n) new_dfs = _map_partitions(lambda df: df.tail(n), df) @@ -186,25 +195,91 @@ class DataFrame(object): pd_tail.columns = self.columns return pd_tail + def front(df, n): + """Get first n columns without creating a new Dataframe""" + + cum_col_lengths = self._col_metadata._lengths.cumsum() + index = np.argmax(cum_col_lengths >= 10) + pd_front = pd.concat(ray.get(x[:index+1]), axis=1, copy=False) + pd_front = pd_front.iloc[:, :n] + pd_front.index = self.index + pd_front.columns = self.columns[:n] + return pd_front + + def back(df, n): + """Get last n columns without creating a new Dataframe""" + + cum_col_lengths = np.flip(self._col_metadata._lengths, + axis=0).cumsum() + index = np.argmax(cum_col_lengths >= 10) + pd_back = pd.concat(ray.get(x[-(index+1):]), axis=1, copy=False) + pd_back = pd_back.iloc[:, -n:] + pd_back.index = self.index + pd_back.columns = self.columns[-n:] + return pd_back + x = self._col_partitions - head = head(x, 30) - tail = tail(x, 30) + get_local_head = False + + # Get first and last 10 columns if there are more than 20 columns + if len(self._col_metadata) >= 20: + get_local_head = True + front = front(x, 10) + back = back(x, 10) + + col_dots = pd.Series(["..." + for _ in range(len(self.index))]) + col_dots.index = self.index + col_dots.name = "..." + x = pd.concat([front, col_dots, back], axis=1) + + # If less than 60 rows, x is already in the correct format. + if len(self._row_metadata) < 60: + return x + + head = head(x, 30, get_local_head) + tail = tail(x, 30, get_local_head) # Make the dots in between the head and tail - dots = pd.Series(["..." - for _ in range(self._block_partitions.shape[1])]) - dots.index = head.columns - dots.name = "..." + row_dots = pd.Series(["..." + for _ in range(len(head.columns))]) + row_dots.index = head.columns + row_dots.name = "..." # We have to do it this way or convert dots to a dataframe and # transpose. This seems better. - result = head.append(dots).append(tail) + result = head.append(row_dots).append(tail) + return result + def __repr__(self): # We use pandas repr so that we match them. + if len(self._row_metadata) <= 60 and \ + len(self._col_metadata) <= 20: + return repr(self._repr_helper_()) # The split here is so that we don't repr pandas row lengths. - return repr(result).split("\n\n")[0] + \ - "\n\n[{0} rows X {1} columns]".format(len(self.index), + result = self._repr_helper_() + final_result = repr(result).rsplit("\n\n", maxsplit=1)[0] + \ + "\n\n[{0} rows x {1} columns]".format(len(self.index), len(self.columns)) + return final_result + + def _repr_html_(self): + """repr function for rendering in Jupyter Notebooks like Pandas + Dataframes. + + Returns: + The HTML representation of a Dataframe. + """ + # We use pandas _repr_html_ to get a string of the HTML representation + # of the dataframe. + if len(self._row_metadata) <= 60 and \ + len(self._col_metadata) <= 20: + return self._repr_helper_()._repr_html_() + # We split so that we insert our correct dataframe dimensions. + result = self._repr_helper_()._repr_html_() + return result.split('

')[0] + \ + '

{0} rows × {1} columns

\n'.format(len(self.index), + len(self.columns)) def _get_index(self): """Get the index for this DataFrame. @@ -258,9 +333,12 @@ class DataFrame(object): # We use the index to get the internal index. oid_series = [(oid_series[i], i) for i in range(len(oid_series))] - for df, partition in oid_series: - this_partition = self._col_metadata.partition_series(partition) - df.index = this_partition[this_partition.isin(df.index)].index + if len(oid_series) > 1: + for df, partition in oid_series: + this_partition = \ + self._col_metadata.partition_series(partition) + df.index = \ + this_partition[this_partition.isin(df.index)].index result_series = pd.concat([obj[0] for obj in oid_series], axis=0, copy=False) @@ -1514,9 +1592,74 @@ class DataFrame(object): def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None, null_counts=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + + def info_helper(df): + output_buffer = io.StringIO() + df.info(verbose=verbose, + buf=output_buffer, + max_cols=max_cols, + memory_usage=memory_usage, + null_counts=null_counts) + return output_buffer.getvalue() + + # Combine the per-partition info and split into lines + result = ''.join(ray.get(_map_partitions(info_helper, + self._col_partitions))) + lines = result.split('\n') + + # Class denoted in info() output + class_string = '\n' + + # Create the Index info() string by parsing self.index + index_string = self.index.summary() + '\n' + + # A column header is needed in the inf() output + col_header = 'Data columns (total {0} columns):\n'.format( + len(self.columns)) + + # Parse the per-partition values to get the per-column details + # Find all the lines in the output that start with integers + prog = re.compile('^[0-9]+.+') + col_lines = [prog.match(line) for line in lines] + cols = [c.group(0) for c in col_lines if c is not None] + # replace the partition columns names with real column names + columns = ["{0}\t{1}\n".format(self.columns[i], + cols[i].split(" ", 1)[1]) + for i in range(len(cols))] + col_string = ''.join(columns) + '\n' + + # A summary of the dtypes in the dataframe + dtypes_string = "dtypes: " + for dtype, count in self.dtypes.value_counts().iteritems(): + dtypes_string += "{0}({1}),".format(dtype, count) + dtypes_string = dtypes_string[:-1] + '\n' + + # Compute the memory usage by summing per-partitions return values + # Parse lines for memory usage number + prog = re.compile('^memory+.+') + mems = [prog.match(line) for line in lines] + mem_vals = [float(re.search(r'\d+', m.group(0)).group()) + for m in mems if m is not None] + + memory_string = "" + + if len(mem_vals) != 0: + # Sum memory usage from each partition + if memory_usage != 'deep': + memory_string = 'memory usage: {0}+ bytes'.format( + sum(mem_vals)) + else: + memory_string = 'memory usage: {0} bytes'.format(sum(mem_vals)) + + # Combine all the components of the info() output + result = ''.join([class_string, index_string, col_header, + col_string, dtypes_string, memory_string]) + + # Write to specified output buffer + if buf: + buf.write(result) + else: + sys.stdout.write(result) def insert(self, loc, column, value, allow_duplicates=False): """Insert column into DataFrame at specified location. @@ -1552,9 +1695,6 @@ class DataFrame(object): df.insert(index_within_partition, column, value, allow_duplicates) return df - print('partition:', partition) - print('i_w_partition', index_within_partition) - print('df:\n', ray.get(self._col_partitions[partition])) new_obj = _deploy_func.remote(insert_col_part, self._col_partitions[partition]) new_cols = [self._col_partitions[i] @@ -1781,9 +1921,18 @@ class DataFrame(object): "github.com/ray-project/ray.") def memory_usage(self, index=True, deep=False): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + + def remote_func(df): + return df.memory_usage(index=False, deep=deep) + + result = self._arithmetic_helper(remote_func, axis=0) + + result.index = self.columns + if index: + index_value = self._row_metadata.index.memory_usage(deep=deep) + return pd.Series(index_value, index=['Index']).append(result) + + return result def merge(self, right, how='inner', on=None, left_on=None, right_on=None, left_index=False, right_index=False, sort=False, diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 46b42d286..7a323a33b 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -1648,11 +1648,12 @@ def test_infer_objects(): ray_df.infer_objects() -def test_info(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.info() +@pytest.fixture +def test_info(ray_df): + info_string = ray_df.info() + assert '\n' in info_string + info_string = ray_df.info(memory_usage=True) + assert 'memory_usage: ' in info_string @pytest.fixture @@ -1815,11 +1816,12 @@ def test_melt(): ray_df.melt() -def test_memory_usage(): - ray_df = create_test_dataframe() - - with pytest.raises(NotImplementedError): - ray_df.memory_usage() +@pytest.fixture +def test_memory_usage(ray_df): + assert type(ray_df.memory_usage()) is pd.core.series.Series + assert ray_df.memory_usage(index=True).at['Index'] is not None + assert ray_df.memory_usage(deep=True).sum() >= \ + ray_df.memory_usage(deep=False).sum() def test_merge(): diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index c4276d88a..458935dd6 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -72,8 +72,9 @@ def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None): row_partitions.append(top) temp_df = temp_df[row_chunksize:] else: - temp_df.reset_index(drop=True, inplace=True) - temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) + if len(df) > row_chunksize: + temp_df.reset_index(drop=True, inplace=True) + temp_df.columns = pd.RangeIndex(0, len(temp_df.columns)) row_partitions.append(ray.put(temp_df)) return row_partitions @@ -160,8 +161,9 @@ def _map_partitions(func, partitions, *argslists): def _build_columns(df_col, columns): """Build columns and compute lengths for each partition.""" # Columns and width - widths = ray.get([_deploy_func.remote(lambda df: len(df.columns), d) - for d in df_col]) + widths = np.array(ray.get([_deploy_func.remote(lambda df: len(df.columns), + d) + for d in df_col])) dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(widths)) for p_sub_idx in range(widths[p_idx])] @@ -175,8 +177,8 @@ def _build_columns(df_col, columns): def _build_index(df_row, index): """Build index and compute lengths for each partition.""" # Rows and length - lengths = ray.get([_deploy_func.remote(_get_lengths, d) - for d in df_row]) + lengths = np.array(ray.get([_deploy_func.remote(_get_lengths, d) + for d in df_row])) dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(lengths)) for p_sub_idx in range(lengths[p_idx])]