[DataFrame] Implement IO for ray_df (#1599)

* Add parquet-cpp to gitignore

* Add read_csv and read_parquet

* Gitignore pytest_cache

* Fix flake8

* Add io to __init__

* Changing Index. Currently running tests, but so far untested.

* Removing issue of reassigning DF in from_pandas

* Fixing lint

* Fix bug

* Fix bug

* Fix bug

* Better performance

* Fixing index issue with sum

* Address comments

* Update io with index

* Updating performance and implementation. Adding tests

* Fixing off-by-1

* Fix lint

* Address Comments

* Make pop compatible with new to_pandas

* Format Code

* Cleanup some index issue

* Bug fix: assigned reset_index back

* Remove unused debug line
This commit is contained in:
Simon Mo
2018-02-26 18:26:38 -08:00
committed by Devin Petersohn
parent 87e107edd8
commit d78a22f94c
5 changed files with 427 additions and 17 deletions
+23 -5
View File
@@ -2,9 +2,27 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .dataframe import DataFrame
from .dataframe import from_pandas
from .dataframe import to_pandas
from .series import Series
DEFAULT_NPARTITIONS = 10
__all__ = ["DataFrame", "from_pandas", "to_pandas", "Series"]
def set_npartition_default(n):
global DEFAULT_NPARTITIONS
DEFAULT_NPARTITIONS = n
def get_npartitions():
return DEFAULT_NPARTITIONS
# We import these file after above two function
# because they depend on npartitions.
from .dataframe import DataFrame # noqa: 402
from .dataframe import from_pandas # noqa: 402
from .dataframe import to_pandas # noqa: 402
from .series import Series # noqa: 402
from .io import (read_csv, read_parquet) # noqa: 402
__all__ = [
"DataFrame", "from_pandas", "to_pandas", "Series", "read_csv",
"read_parquet"
]
+47 -12
View File
@@ -373,16 +373,29 @@ class DataFrame(object):
temp_index = [idx
for _ in range(len(self._df))
for idx in self.columns]
temp_columns = self.index
local_transpose = self._map_partitions(
lambda df: df.transpose(*args, **kwargs), index=temp_index)
local_transpose.columns = temp_columns
# Sum will collapse the NAs from the groupby
return local_transpose.reduce_by_index(
df = local_transpose.reduce_by_index(
lambda df: df.apply(lambda x: x), axis=1)
# Reassign the columns within partition to self.index.
# We have to use _depoly_func instead of _map_partition due to
# new_labels argument
def _reassign_columns(df, new_labels):
df.columns = new_labels
return df
df._df = [
_deploy_func.remote(
_reassign_columns,
part,
self.index) for part in df._df]
return df
T = property(transpose)
def dropna(self, axis, how, thresh=None, subset=[], inplace=False):
@@ -563,9 +576,15 @@ class DataFrame(object):
for _ in range(len(self._df))
for idx in self.columns]
return sum(ray.get(self._map_partitions(lambda df: df.count(
axis=axis, level=level, numeric_only=numeric_only
), index=temp_index)._df))
collapsed_df = sum(
ray.get(
self._map_partitions(
lambda df: df.count(
axis=axis,
level=level,
numeric_only=numeric_only),
index=temp_index)._df))
return collapsed_df
def cov(self, min_periods=None):
raise NotImplementedError("Not Yet implemented.")
@@ -865,7 +884,9 @@ class DataFrame(object):
iters = ray.get([
_deploy_func.remote(
lambda df: list(df.iterrows()), part) for part in self._df])
return itertools.chain.from_iterable(iters)
iters = itertools.chain.from_iterable(iters)
series = map(lambda idx_series_tuple: idx_series_tuple[1], iters)
return zip(self.index, series)
def items(self):
"""Iterator over (column name, Series) pairs.
@@ -884,6 +905,7 @@ class DataFrame(object):
def concat_iters(iterables):
for partitions in zip(*iterables):
series = pd.concat([_series for _, _series in partitions])
series.index = self.index
yield (series.name, series)
return concat_iters(iters)
@@ -919,7 +941,20 @@ class DataFrame(object):
_deploy_func.remote(
lambda df: list(df.itertuples(index=index, name=name)),
part) for part in self._df])
return itertools.chain.from_iterable(iters)
iters = itertools.chain.from_iterable(iters)
def _replace_index(row_tuple, idx):
# We need to use try-except here because
# isinstance(row_tuple, namedtuple) won't work.
try:
row_tuple = row_tuple._replace(Index=idx)
except AttributeError: # Tuple not namedtuple
row_tuple = (idx,) + row_tuple[1:]
return row_tuple
if index:
iters = itertools.starmap(_replace_index, zip(iters, self.index))
return iters
def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
sort=False):
@@ -1100,8 +1135,7 @@ class DataFrame(object):
popped = to_pandas(self._map_partitions(
lambda df: df.pop(item)))
self._df = self._map_partitions(lambda df: df.drop([item], axis=1))._df
self.columns = [col for col in self.columns if col != item]
self.columns = self.columns.drop(item)
return popped
def pow(self, other, axis='columns', level=None, fill_value=None):
@@ -1949,13 +1983,14 @@ def from_pandas(df, npartitions=None, chunksize=None, sort=True):
while len(temp_df) > chunksize:
t_df = temp_df[:chunksize]
lengths.append(len(t_df))
# reindex here because we want a pd.RangeIndex within the partitions.
# It is smaller and sometimes faster.
t_df.reindex()
# reset_index here because we want a pd.RangeIndex
# within the partitions. It is smaller and sometimes faster.
t_df = t_df.reset_index(drop=True)
top = ray.put(t_df)
dataframes.append(top)
temp_df = temp_df[chunksize:]
else:
temp_df = temp_df.reset_index(drop=True)
dataframes.append(ray.put(temp_df))
lengths.append(len(temp_df))
+262
View File
@@ -0,0 +1,262 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from itertools import chain
from io import BytesIO
import os
import re
from pyarrow.parquet import ParquetFile
import pandas as pd
from .dataframe import ray, DataFrame
from . import get_npartitions
# Parquet
def read_parquet(path, engine='auto', columns=None, **kwargs):
"""Load a parquet object from the file path, returning a DataFrame.
Ray DataFrame only supports pyarrow engine for now.
Args:
path: The filepath of the parquet file.
We only support local files for now.
engine: Ray only support pyarrow reader.
This argument doesn't do anything for now.
kwargs: Pass into parquet's read_row_group function.
"""
pf = ParquetFile(path)
n_rows = pf.metadata.num_rows
chunksize = n_rows // get_npartitions()
n_row_groups = pf.metadata.num_row_groups
idx_regex = re.compile('__index_level_\d+__')
columns = [
name for name in pf.metadata.schema.names if not idx_regex.match(name)
]
df_from_row_groups = [
_read_parquet_row_group.remote(path, columns, i, kwargs)
for i in range(n_row_groups)
]
splited_dfs = ray.get(
[_split_df.remote(df, chunksize) for df in df_from_row_groups])
df_remotes = list(chain.from_iterable(splited_dfs))
return DataFrame(df_remotes, columns)
@ray.remote
def _read_parquet_row_group(path, columns, row_group_id, kwargs={}):
"""Read a parquet row_group given file_path.
"""
pf = ParquetFile(path)
df = pf.read_row_group(row_group_id, columns=columns, **kwargs).to_pandas()
return df
@ray.remote
def _split_df(pd_df, chunksize):
"""Split a pd_df into partitions.
Returns:
remote_df_ids ([ObjectID])
"""
dataframes = []
while len(pd_df) > chunksize:
t_df = pd_df[:chunksize]
t_df.reset_index(drop=True)
top = ray.put(t_df)
dataframes.append(top)
pd_df = pd_df[chunksize:]
else:
pd_df = pd_df.reset_index(drop=True)
dataframes.append(ray.put(pd_df))
return dataframes
# CSV
def _compute_offset(fn, npartitions):
"""
Calculate the currect bytes offsets for a csv file.
Return a list of (start, end) tuple where the end == \n or EOF.
"""
total_bytes = os.path.getsize(fn)
chunksize = total_bytes // npartitions
if chunksize == 0:
chunksize = 1
bio = open(fn, 'rb')
offsets = []
start = 0
while start <= total_bytes:
bio.seek(chunksize, 1) # Move forward {chunksize} bytes
extend_line = bio.readline() # Move after the next \n
total_offset = chunksize + len(extend_line)
# The position of the \n we just crossed.
new_line_cursor = start + total_offset - 1
offsets.append((start, new_line_cursor))
start = new_line_cursor + 1
bio.close()
return offsets
def _get_firstline(file_path):
bio = open(file_path, 'rb')
first = bio.readline()
bio.close()
return first
def _infer_column(first_line):
return pd.read_csv(BytesIO(first_line)).columns
@ray.remote
def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}):
bio = open(fn, 'rb')
bio.seek(start)
to_read = header + bio.read(end - start)
bio.close()
return pd.read_csv(BytesIO(to_read), **kwargs)
def read_csv(filepath,
sep=',',
delimiter=None,
header='infer',
names=None,
index_col=None,
usecols=None,
squeeze=False,
prefix=None,
mangle_dupe_cols=True,
dtype=None,
engine=None,
converters=None,
true_values=None,
false_values=None,
skipinitialspace=False,
skiprows=None,
nrows=None,
na_values=None,
keep_default_na=True,
na_filter=True,
verbose=False,
skip_blank_lines=True,
parse_dates=False,
infer_datetime_format=False,
keep_date_col=False,
date_parser=None,
dayfirst=False,
iterator=False,
chunksize=None,
compression='infer',
thousands=None,
decimal=b'.',
lineterminator=None,
quotechar='"',
quoting=0,
escapechar=None,
comment=None,
encoding=None,
dialect=None,
tupleize_cols=None,
error_bad_lines=True,
warn_bad_lines=True,
skipfooter=0,
skip_footer=0,
doublequote=True,
delim_whitespace=False,
as_recarray=None,
compact_ints=None,
use_unsigned=None,
low_memory=True,
buffer_lines=None,
memory_map=False,
float_precision=None):
"""Read csv file from local disk.
Args:
filepath:
The filepath of the csv file.
We only support local files for now.
kwargs: Keyword arguments in pandas::from_csv
"""
kwargs = dict(
sep=sep,
delimiter=delimiter,
header=header,
names=names,
index_col=index_col,
usecols=usecols,
squeeze=squeeze,
prefix=prefix,
mangle_dupe_cols=mangle_dupe_cols,
dtype=dtype,
engine=engine,
converters=converters,
true_values=true_values,
false_values=false_values,
skipinitialspace=skipinitialspace,
skiprows=skiprows,
nrows=nrows,
na_values=na_values,
keep_default_na=keep_default_na,
na_filter=na_filter,
verbose=verbose,
skip_blank_lines=skip_blank_lines,
parse_dates=parse_dates,
infer_datetime_format=infer_datetime_format,
keep_date_col=keep_date_col,
date_parser=date_parser,
dayfirst=dayfirst,
iterator=iterator,
chunksize=chunksize,
compression=compression,
thousands=thousands,
decimal=decimal,
lineterminator=lineterminator,
quotechar=quotechar,
quoting=quoting,
escapechar=escapechar,
comment=comment,
encoding=encoding,
dialect=dialect,
tupleize_cols=tupleize_cols,
error_bad_lines=error_bad_lines,
warn_bad_lines=warn_bad_lines,
skipfooter=skipfooter,
skip_footer=skip_footer,
doublequote=doublequote,
delim_whitespace=delim_whitespace,
as_recarray=as_recarray,
compact_ints=compact_ints,
use_unsigned=use_unsigned,
low_memory=low_memory,
buffer_lines=buffer_lines,
memory_map=memory_map,
float_precision=float_precision)
offsets = _compute_offset(filepath, get_npartitions())
first_line = _get_firstline(filepath)
columns = _infer_column(first_line)
df_obj_ids = []
for start, end in offsets:
if start != 0:
df = _read_csv_with_offset.remote(
filepath, start, end, header=first_line, kwargs=kwargs)
else:
df = _read_csv_with_offset.remote(
filepath, start, end, kwargs=kwargs)
df_obj_ids.append(df)
return DataFrame(df_obj_ids, columns)
+91
View File
@@ -0,0 +1,91 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import numpy as np
import pandas as pd
import ray
import ray.dataframe as rdf
import ray.dataframe.io as io
import os
TEST_PARQUET_FILENAME = 'test.parquet'
TEST_CSV_FILENAME = 'test.csv'
SMALL_ROW_SIZE = 2000
LARGE_ROW_SIZE = 7e6
@pytest.fixture
def ray_df_equals_pandas(ray_df, pandas_df):
return rdf.to_pandas(ray_df).sort_index().equals(pandas_df.sort_index())
@pytest.fixture
def setup_parquet_file(row_size, force=False):
if os.path.exists(TEST_PARQUET_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_parquet(TEST_PARQUET_FILENAME)
@pytest.fixture
def teardown_parquet_file():
if os.path.exists(TEST_PARQUET_FILENAME):
os.remove(TEST_PARQUET_FILENAME)
@pytest.fixture
def setup_csv_file(row_size, force=False):
if os.path.exists(TEST_CSV_FILENAME) and not force:
pass
else:
df = pd.DataFrame({
'col1': np.arange(row_size),
'col2': np.arange(row_size)
})
df.to_csv(TEST_CSV_FILENAME)
@pytest.fixture
def teardown_csv_file():
if os.path.exists(TEST_CSV_FILENAME):
os.remove(TEST_CSV_FILENAME)
def test_from_parquet_small():
ray.init()
setup_parquet_file(SMALL_ROW_SIZE)
pd_df = pd.read_parquet(TEST_PARQUET_FILENAME)
ray_df = io.read_parquet(TEST_PARQUET_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_parquet_file()
def test_from_parquet_large():
setup_parquet_file(LARGE_ROW_SIZE)
pd_df = pd.read_parquet(TEST_PARQUET_FILENAME)
ray_df = io.read_parquet(TEST_PARQUET_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_parquet_file()
def test_from_csv():
setup_csv_file(SMALL_ROW_SIZE)
pd_df = pd.read_csv(TEST_CSV_FILENAME)
ray_df = io.read_csv(TEST_CSV_FILENAME)
assert ray_df_equals_pandas(ray_df, pd_df)
teardown_csv_file()