[DataFrame] Fixed repr, info, and memory_usage (#1874)

* working with dataframes with too many rows and columns

* repr works for jupyter notebooks now

* added comments and test file

* added repr test file to .travis.yml

* added back ray.dataframe as pd to test file

* fixed pandas importing issues in test file

* getting the front and back of df more efficiently

* only keeping dataframe tests in travis

* fixing numpy array for row and col lengths issue

* doesn't add dimensions if df is small enough

* implemented memory_usage()

* completed memory_usage - still failing 2 tests

* only failing one test for memory_usage

* all repr and dataframes tests passing now

* fixing error related to python2 in info()

* fixing python2 errors

* fixed linting errosr

* using _arithmetic_helper in memory_usage()

* fixed last lint error

* removed testing-specific code

* adding back travis test

* removing extra tests from travis

* re-added concat test

* fixes with new indexing scheme

* code cleanup

* fully working with new indexing scheme

* added tests for info and memory_usage

* removed test file
This commit is contained in:
Omkar Salpekar
2018-04-11 08:07:07 -07:00
committed by Devin Petersohn
parent 806b2c844e
commit a3ddde398c
3 changed files with 196 additions and 43 deletions
+176 -27
View File
@@ -21,6 +21,9 @@ import warnings
import numpy as np
import ray
import itertools
import io
import sys
import re
from .utils import (
_deploy_func,
@@ -85,6 +88,7 @@ class DataFrame(object):
axis = 0
columns = pd_df.columns
index = pd_df.index
self._row_metadata = self._col_metadata = None
else:
# created this invariant to make sure we never have to go into the
# partitions to get the columns
@@ -158,13 +162,16 @@ class DataFrame(object):
def __str__(self):
return repr(self)
def __repr__(self):
if len(self._row_metadata) < 60:
result = repr(to_pandas(self))
return result
def _repr_helper_(self):
if len(self._row_metadata) <= 60 and \
len(self._col_metadata) <= 20:
return to_pandas(self)
def head(df, n):
def head(df, n, get_local_head=False):
"""Compute the head for this without creating a new DataFrame"""
if get_local_head:
return df.head(n)
new_dfs = _map_partitions(lambda df: df.head(n),
df)
@@ -174,8 +181,10 @@ class DataFrame(object):
pd_head.columns = self.columns
return pd_head
def tail(df, n):
def tail(df, n, get_local_tail=False):
"""Compute the tail for this without creating a new DataFrame"""
if get_local_tail:
return df.tail(n)
new_dfs = _map_partitions(lambda df: df.tail(n),
df)
@@ -186,25 +195,91 @@ class DataFrame(object):
pd_tail.columns = self.columns
return pd_tail
def front(df, n):
"""Get first n columns without creating a new Dataframe"""
cum_col_lengths = self._col_metadata._lengths.cumsum()
index = np.argmax(cum_col_lengths >= 10)
pd_front = pd.concat(ray.get(x[:index+1]), axis=1, copy=False)
pd_front = pd_front.iloc[:, :n]
pd_front.index = self.index
pd_front.columns = self.columns[:n]
return pd_front
def back(df, n):
"""Get last n columns without creating a new Dataframe"""
cum_col_lengths = np.flip(self._col_metadata._lengths,
axis=0).cumsum()
index = np.argmax(cum_col_lengths >= 10)
pd_back = pd.concat(ray.get(x[-(index+1):]), axis=1, copy=False)
pd_back = pd_back.iloc[:, -n:]
pd_back.index = self.index
pd_back.columns = self.columns[-n:]
return pd_back
x = self._col_partitions
head = head(x, 30)
tail = tail(x, 30)
get_local_head = False
# Get first and last 10 columns if there are more than 20 columns
if len(self._col_metadata) >= 20:
get_local_head = True
front = front(x, 10)
back = back(x, 10)
col_dots = pd.Series(["..."
for _ in range(len(self.index))])
col_dots.index = self.index
col_dots.name = "..."
x = pd.concat([front, col_dots, back], axis=1)
# If less than 60 rows, x is already in the correct format.
if len(self._row_metadata) < 60:
return x
head = head(x, 30, get_local_head)
tail = tail(x, 30, get_local_head)
# Make the dots in between the head and tail
dots = pd.Series(["..."
for _ in range(self._block_partitions.shape[1])])
dots.index = head.columns
dots.name = "..."
row_dots = pd.Series(["..."
for _ in range(len(head.columns))])
row_dots.index = head.columns
row_dots.name = "..."
# We have to do it this way or convert dots to a dataframe and
# transpose. This seems better.
result = head.append(dots).append(tail)
result = head.append(row_dots).append(tail)
return result
def __repr__(self):
# We use pandas repr so that we match them.
if len(self._row_metadata) <= 60 and \
len(self._col_metadata) <= 20:
return repr(self._repr_helper_())
# The split here is so that we don't repr pandas row lengths.
return repr(result).split("\n\n")[0] + \
"\n\n[{0} rows X {1} columns]".format(len(self.index),
result = self._repr_helper_()
final_result = repr(result).rsplit("\n\n", maxsplit=1)[0] + \
"\n\n[{0} rows x {1} columns]".format(len(self.index),
len(self.columns))
return final_result
def _repr_html_(self):
"""repr function for rendering in Jupyter Notebooks like Pandas
Dataframes.
Returns:
The HTML representation of a Dataframe.
"""
# We use pandas _repr_html_ to get a string of the HTML representation
# of the dataframe.
if len(self._row_metadata) <= 60 and \
len(self._col_metadata) <= 20:
return self._repr_helper_()._repr_html_()
# We split so that we insert our correct dataframe dimensions.
result = self._repr_helper_()._repr_html_()
return result.split('<p>')[0] + \
'<p>{0} rows × {1} columns</p>\n</div>'.format(len(self.index),
len(self.columns))
def _get_index(self):
"""Get the index for this DataFrame.
@@ -258,9 +333,12 @@ class DataFrame(object):
# We use the index to get the internal index.
oid_series = [(oid_series[i], i) for i in range(len(oid_series))]
for df, partition in oid_series:
this_partition = self._col_metadata.partition_series(partition)
df.index = this_partition[this_partition.isin(df.index)].index
if len(oid_series) > 1:
for df, partition in oid_series:
this_partition = \
self._col_metadata.partition_series(partition)
df.index = \
this_partition[this_partition.isin(df.index)].index
result_series = pd.concat([obj[0] for obj in oid_series],
axis=0, copy=False)
@@ -1514,9 +1592,74 @@ class DataFrame(object):
def info(self, verbose=None, buf=None, max_cols=None, memory_usage=None,
null_counts=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def info_helper(df):
output_buffer = io.StringIO()
df.info(verbose=verbose,
buf=output_buffer,
max_cols=max_cols,
memory_usage=memory_usage,
null_counts=null_counts)
return output_buffer.getvalue()
# Combine the per-partition info and split into lines
result = ''.join(ray.get(_map_partitions(info_helper,
self._col_partitions)))
lines = result.split('\n')
# Class denoted in info() output
class_string = '<class \'ray.dataframe.dataframe.DataFrame\'>\n'
# Create the Index info() string by parsing self.index
index_string = self.index.summary() + '\n'
# A column header is needed in the inf() output
col_header = 'Data columns (total {0} columns):\n'.format(
len(self.columns))
# Parse the per-partition values to get the per-column details
# Find all the lines in the output that start with integers
prog = re.compile('^[0-9]+.+')
col_lines = [prog.match(line) for line in lines]
cols = [c.group(0) for c in col_lines if c is not None]
# replace the partition columns names with real column names
columns = ["{0}\t{1}\n".format(self.columns[i],
cols[i].split(" ", 1)[1])
for i in range(len(cols))]
col_string = ''.join(columns) + '\n'
# A summary of the dtypes in the dataframe
dtypes_string = "dtypes: "
for dtype, count in self.dtypes.value_counts().iteritems():
dtypes_string += "{0}({1}),".format(dtype, count)
dtypes_string = dtypes_string[:-1] + '\n'
# Compute the memory usage by summing per-partitions return values
# Parse lines for memory usage number
prog = re.compile('^memory+.+')
mems = [prog.match(line) for line in lines]
mem_vals = [float(re.search(r'\d+', m.group(0)).group())
for m in mems if m is not None]
memory_string = ""
if len(mem_vals) != 0:
# Sum memory usage from each partition
if memory_usage != 'deep':
memory_string = 'memory usage: {0}+ bytes'.format(
sum(mem_vals))
else:
memory_string = 'memory usage: {0} bytes'.format(sum(mem_vals))
# Combine all the components of the info() output
result = ''.join([class_string, index_string, col_header,
col_string, dtypes_string, memory_string])
# Write to specified output buffer
if buf:
buf.write(result)
else:
sys.stdout.write(result)
def insert(self, loc, column, value, allow_duplicates=False):
"""Insert column into DataFrame at specified location.
@@ -1552,9 +1695,6 @@ class DataFrame(object):
df.insert(index_within_partition, column, value, allow_duplicates)
return df
print('partition:', partition)
print('i_w_partition', index_within_partition)
print('df:\n', ray.get(self._col_partitions[partition]))
new_obj = _deploy_func.remote(insert_col_part,
self._col_partitions[partition])
new_cols = [self._col_partitions[i]
@@ -1781,9 +1921,18 @@ class DataFrame(object):
"github.com/ray-project/ray.")
def memory_usage(self, index=True, deep=False):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
def remote_func(df):
return df.memory_usage(index=False, deep=deep)
result = self._arithmetic_helper(remote_func, axis=0)
result.index = self.columns
if index:
index_value = self._row_metadata.index.memory_usage(deep=deep)
return pd.Series(index_value, index=['Index']).append(result)
return result
def merge(self, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, sort=False,
+12 -10
View File
@@ -1648,11 +1648,12 @@ def test_infer_objects():
ray_df.infer_objects()
def test_info():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.info()
@pytest.fixture
def test_info(ray_df):
info_string = ray_df.info()
assert '<class \'ray.dataframe.dataframe.DataFrame\'>\n' in info_string
info_string = ray_df.info(memory_usage=True)
assert 'memory_usage: ' in info_string
@pytest.fixture
@@ -1815,11 +1816,12 @@ def test_melt():
ray_df.melt()
def test_memory_usage():
ray_df = create_test_dataframe()
with pytest.raises(NotImplementedError):
ray_df.memory_usage()
@pytest.fixture
def test_memory_usage(ray_df):
assert type(ray_df.memory_usage()) is pd.core.series.Series
assert ray_df.memory_usage(index=True).at['Index'] is not None
assert ray_df.memory_usage(deep=True).sum() >= \
ray_df.memory_usage(deep=False).sum()
def test_merge():
+8 -6
View File
@@ -72,8 +72,9 @@ def _partition_pandas_dataframe(df, num_partitions=None, row_chunksize=None):
row_partitions.append(top)
temp_df = temp_df[row_chunksize:]
else:
temp_df.reset_index(drop=True, inplace=True)
temp_df.columns = pd.RangeIndex(0, len(temp_df.columns))
if len(df) > row_chunksize:
temp_df.reset_index(drop=True, inplace=True)
temp_df.columns = pd.RangeIndex(0, len(temp_df.columns))
row_partitions.append(ray.put(temp_df))
return row_partitions
@@ -160,8 +161,9 @@ def _map_partitions(func, partitions, *argslists):
def _build_columns(df_col, columns):
"""Build columns and compute lengths for each partition."""
# Columns and width
widths = ray.get([_deploy_func.remote(lambda df: len(df.columns), d)
for d in df_col])
widths = np.array(ray.get([_deploy_func.remote(lambda df: len(df.columns),
d)
for d in df_col]))
dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(widths))
for p_sub_idx in range(widths[p_idx])]
@@ -175,8 +177,8 @@ def _build_columns(df_col, columns):
def _build_index(df_row, index):
"""Build index and compute lengths for each partition."""
# Rows and length
lengths = ray.get([_deploy_func.remote(_get_lengths, d)
for d in df_row])
lengths = np.array(ray.get([_deploy_func.remote(_get_lengths, d)
for d in df_row]))
dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(lengths))
for p_sub_idx in range(lengths[p_idx])]