[DataFrame] Implement to_csv (#2014)

* Add map, reduce, merge_dtypes

bug fixes

Unify dtypes on DataFrame creation

Formatting and comments

Cache dtypes

Fix bug in _merge_dtypes

Fix bug

Changed caching logic

Fix dtypes issue in read_csv

Invalidate dtypes cache when inserting column

Simplify unifying dtypes and improve caching

Fix typo

Better caching of dtypes

Fix merge conflicts

Implemented some to_csv functions

Support read_csv from buffers

Expose date_range, NaT, Timedelta from pandas

Add testing utils

Redirect imports to Pandas

Fix imports

Fix read_csv when index_col is specified

Update imports from Pandas

Fix bugs

Use util API

Fix nasty bug

Add missing import

Don't distribute reading of compressed files

Add test utilities for Pandas tests

Add test for to_csv

Add warnings

Fix rebase artifacts

* Fix rebase artifacts

* Fix bugs in read_csv indexing

* Fix bugs in read_csv

* Fix bug for IndexMetadata with _length 1

Remove testing imports

* Rebase artifacts and formatting

* Start to_csv without CSV formatter

* Fix bug in _map_partitions

* Initial implementation for improved to_csv

* Fix bug with insert

* Bugfixes

* Remove CSV Formatter

* Formatting

* Fix python2 bug

* Fix additional python2 issue
This commit is contained in:
Peter Schafhalter
2018-05-17 14:35:17 -07:00
committed by Devin Petersohn
parent 796864d887
commit ae17ebd032
5 changed files with 161 additions and 41 deletions
+2 -2
View File
@@ -9,7 +9,7 @@ from pandas import (eval, unique, value_counts, cut, to_numeric, factorize,
test, qcut, match, Panel, date_range, Index, MultiIndex,
CategoricalIndex, Series, bdate_range, DatetimeIndex,
Timedelta, Timestamp, to_timedelta, set_eng_float_format,
set_option, NaT)
set_option, NaT, PeriodIndex, Categorical)
import threading
pd_version = pd.__version__
@@ -49,7 +49,7 @@ __all__ = [
"match", "to_datetime", "get_dummies", "Panel", "date_range", "Index",
"MultiIndex", "Series", "bdate_range", "DatetimeIndex", "to_timedelta",
"set_eng_float_format", "set_option", "CategoricalIndex", "Timedelta",
"Timestamp", "NaT"
"Timestamp", "NaT", "PeriodIndex", "Categorical"
]
try:
+89 -21
View File
@@ -61,7 +61,8 @@ class DataFrame(object):
data (numpy ndarray (structured or homogeneous) or dict):
Dict can contain Series, arrays, constants, or list-like
objects.
index (pandas.Index or list): The row index for this dataframe.
index (pandas.Index, list, ObjectID): The row index for this
dataframe.
columns (pandas.Index): The column names for this dataframe, in
pandas Index object.
dtype: Data type to force. Only a single dtype is allowed.
@@ -1845,7 +1846,8 @@ class DataFrame(object):
columns = columns_copy.columns
if inplace:
self._update_inplace(row_partitions=new_rows, columns=columns)
self._update_inplace(row_partitions=new_rows, columns=columns,
index=self.index)
else:
return DataFrame(columns=columns, row_partitions=new_rows)
@@ -2330,18 +2332,31 @@ class DataFrame(object):
# Deploy insert function to specific column partition, and replace that
# column
def insert_col_part(df):
df.insert(index_within_partition, column, value, allow_duplicates)
if isinstance(value, pd.Series) and \
isinstance(value.dtype,
pd.core.dtypes.dtypes.DatetimeTZDtype):
# Need to set index to index of this dtype or inserted values
# become NaT
df.index = value
df.insert(index_within_partition, column,
value, allow_duplicates)
df.index = pd.RangeIndex(0, len(df))
else:
df.insert(index_within_partition, column,
value, allow_duplicates)
return df
new_obj = _deploy_func.remote(insert_col_part,
self._col_partitions[partition])
new_cols = [self._col_partitions[i]
if i != partition
else new_obj
for i in range(len(self._col_partitions))]
new_col_names = self.columns.insert(loc, column)
self._update_inplace(col_partitions=new_cols, columns=new_col_names)
self._update_inplace(col_partitions=new_cols, columns=new_col_names,
index=self.index)
def interpolate(self, method='linear', axis=0, limit=None, inplace=False,
limit_direction='forward', downcast=None, **kwargs):
@@ -3242,7 +3257,7 @@ class DataFrame(object):
self._row_partitions)
if inplace:
self._update_inplace(row_partitions=new_rows)
self._update_inplace(row_partitions=new_rows, index=self.index)
else:
return DataFrame(row_partitions=new_rows,
col_metadata=self._col_metadata)
@@ -4201,23 +4216,72 @@ class DataFrame(object):
port_frame = to_pandas(self)
port_frame.to_clipboard(excel, sep, **kwargs)
def to_csv(self, path_or_buf=None, sep=',', na_rep='', float_format=None,
def to_csv(self, path_or_buf=None, sep=",", na_rep="", float_format=None,
columns=None, header=True, index=True, index_label=None,
mode='w', encoding=None, compression=None, quoting=None,
quotechar='"', line_terminator='\n', chunksize=None,
mode="w", encoding=None, compression=None, quoting=None,
quotechar='"', line_terminator="\n", chunksize=None,
tupleize_cols=None, date_format=None, doublequote=True,
escapechar=None, decimal='.'):
escapechar=None, decimal="."):
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
kwargs = dict(
path_or_buf=path_or_buf, sep=sep, na_rep=na_rep,
float_format=float_format, columns=columns, header=header,
index=index, index_label=index_label, mode=mode,
encoding=encoding, compression=compression, quoting=quoting,
quotechar=quotechar, line_terminator=line_terminator,
chunksize=chunksize, tupleize_cols=tupleize_cols,
date_format=date_format, doublequote=doublequote,
escapechar=escapechar, decimal=decimal
)
port_frame = to_pandas(self)
port_frame.to_csv(path_or_buf, sep, na_rep, float_format,
columns, header, index, index_label,
mode, encoding, compression, quoting,
quotechar, line_terminator, chunksize,
tupleize_cols, date_format, doublequote,
escapechar, decimal)
if compression is not None:
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
return to_pandas(self).to_csv(**kwargs)
if tupleize_cols is not None:
warnings.warn("The 'tupleize_cols' parameter is deprecated and "
"will be removed in a future version",
FutureWarning, stacklevel=2)
else:
tupleize_cols = False
remote_kwargs_id = ray.put(dict(kwargs, path_or_buf=None))
columns_id = ray.put(self.columns)
def get_csv_str(df, index, columns, header, kwargs):
df.index = index
df.columns = columns
kwargs["header"] = header
return df.to_csv(**kwargs)
idxs = [0] + np.cumsum(self._row_metadata._lengths).tolist()
idx_args = [self.index[idxs[i]:idxs[i+1]]
for i in range(len(self._row_partitions))]
csv_str_ids = _map_partitions(
get_csv_str, self._row_partitions, idx_args,
[columns_id] * len(self._row_partitions),
[header] + [False] * (len(self._row_partitions) - 1),
[remote_kwargs_id] * len(self._row_partitions))
if path_or_buf is None:
buf = io.StringIO()
elif isinstance(path_or_buf, str):
buf = open(path_or_buf, mode)
else:
buf = path_or_buf
for csv_str_id in csv_str_ids:
buf.write(ray.get(csv_str_id))
buf.flush()
result = None
if path_or_buf is None:
result = buf.getvalue()
buf.close()
elif isinstance(path_or_buf, str):
buf.close()
return result
def to_dense(self):
raise NotImplementedError(
@@ -4668,9 +4732,13 @@ class DataFrame(object):
index=index)
else:
columns = self._col_metadata[key].index
indices_for_rows = \
[i for i, item in enumerate(self.columns)
if item in set(columns)]
column_indices = {item: i for i, item in enumerate(self.columns)}
indices_for_rows = [column_indices[column] for column in columns]
def get_columns_partition(df):
result = df.__getitem__(indices_for_rows),
result.columns = pd.RangeIndex(0, len(result.columns))
return result
new_parts = [_deploy_func.remote(
lambda df: df.__getitem__(indices_for_rows),
+4 -1
View File
@@ -232,7 +232,10 @@ class _IndexMetadata(object):
# Determine which partition to place it in, and where in that partition
if loc is not None:
cum_lens = np.cumsum(self._lengths)
partition = np.digitize(loc, cum_lens[:-1])
if len(cum_lens) > 1:
partition = np.digitize(loc, cum_lens[:-1], right=True)
else:
partition = 0
if partition >= len(cum_lens):
if loc > cum_lens[-1]:
raise IndexError("index {0} is out of bounds".format(loc))
+64 -15
View File
@@ -10,6 +10,8 @@ import warnings
from pyarrow.parquet import ParquetFile
import pandas as pd
from pandas.io.common import _infer_compression # don't depend on internal API
from .dataframe import ray, DataFrame
from . import get_npartitions
@@ -82,21 +84,24 @@ def _split_df(pd_df, chunksize):
# CSV
def _compute_offset(fn, npartitions):
def _compute_offset(fn, npartitions, ignore_first_line=False):
"""
Calculate the currect bytes offsets for a csv file.
Return a list of (start, end) tuple where the end == \n or EOF.
"""
total_bytes = os.path.getsize(fn)
chunksize = total_bytes // npartitions
bio = open(fn, 'rb')
if ignore_first_line:
start = len(bio.readline())
chunksize = (total_bytes - start) // npartitions
else:
start = 0
chunksize = total_bytes // npartitions
if chunksize == 0:
chunksize = 1
bio = open(fn, 'rb')
offsets = []
start = 0
while start <= total_bytes:
while start < total_bytes:
bio.seek(chunksize, 1) # Move forward {chunksize} bytes
extend_line = bio.readline() # Move after the next \n
total_offset = chunksize + len(extend_line)
@@ -121,15 +126,26 @@ def _infer_column(first_line, kwargs={}):
@ray.remote
def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}):
def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''):
kwargs["quoting"] = int(kwargs["quoting"]) # See issue #2078
bio = open(fn, 'rb')
bio.seek(start)
to_read = header + bio.read(end - start)
bio.close()
return pd.read_csv(BytesIO(to_read), **kwargs)
pd_df = pd.read_csv(BytesIO(to_read), **kwargs)
index = pd_df.index
# Partitions must have RangeIndex
pd_df.index = pd.RangeIndex(0, len(pd_df))
return pd_df, index
def read_csv(filepath,
@ray.remote
def get_index(*partition_indices):
return partition_indices[0].append(partition_indices[1:])
def read_csv(filepath_or_buffer,
sep=',',
delimiter=None,
header='infer',
@@ -247,21 +263,54 @@ def read_csv(filepath,
memory_map=memory_map,
float_precision=float_precision)
offsets = _compute_offset(filepath, get_npartitions())
# Default to Pandas read_csv for non-serializable objects
if not isinstance(filepath_or_buffer, str) or \
_infer_compression(filepath_or_buffer, compression) is not None:
warnings.warn("Defaulting to Pandas implementation",
PendingDeprecationWarning)
pd_obj = pd.read_csv(filepath_or_buffer, **kwargs)
if isinstance(pd_obj, pd.DataFrame):
return from_pandas(pd_obj, get_npartitions())
return pd_obj
filepath = filepath_or_buffer
# TODO: handle case where header is a list of lines
first_line = _get_firstline(filepath)
columns = _infer_column(first_line, kwargs=kwargs)
if header is None or (header == "infer" and names is not None):
first_line = b""
ignore_first_line = False
else:
ignore_first_line = True
offsets = _compute_offset(filepath, get_npartitions(),
ignore_first_line=ignore_first_line)
# Serialize objects to speed up later use in remote tasks
first_line_id = ray.put(first_line)
kwargs_id = ray.put(kwargs)
df_obj_ids = []
index_obj_ids = []
for start, end in offsets:
if start != 0:
df = _read_csv_with_offset.remote(
filepath, start, end, header=first_line, kwargs=kwargs)
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id, first_line_id),
num_return_vals=2)
else:
df = _read_csv_with_offset.remote(
filepath, start, end, kwargs=kwargs)
df, index = _read_csv_with_offset._submit(
args=(filepath, start, end, kwargs_id),
num_return_vals=2)
df_obj_ids.append(df)
return DataFrame(row_partitions=df_obj_ids, columns=columns)
index_obj_ids.append(index)
index = get_index.remote(*index_obj_ids) if index_col is not None else None
return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index)
def read_json(path_or_buf=None,
+2 -2
View File
@@ -226,8 +226,8 @@ def _map_partitions(func, partitions, *argslists):
for part in partitions]
else:
assert(all([len(args) == len(partitions) for args in argslists]))
return [_deploy_func.remote(func, part, *args)
for part, args in zip(partitions, *argslists)]
return [_deploy_func.remote(func, *args)
for args in zip(partitions, *argslists)]
@ray.remote