From ae17ebd03289a35ae42c0eb893c6e301920f46f5 Mon Sep 17 00:00:00 2001 From: Peter Schafhalter Date: Thu, 17 May 2018 14:35:17 -0700 Subject: [PATCH] [DataFrame] Implement to_csv (#2014) * Add map, reduce, merge_dtypes bug fixes Unify dtypes on DataFrame creation Formatting and comments Cache dtypes Fix bug in _merge_dtypes Fix bug Changed caching logic Fix dtypes issue in read_csv Invalidate dtypes cache when inserting column Simplify unifying dtypes and improve caching Fix typo Better caching of dtypes Fix merge conflicts Implemented some to_csv functions Support read_csv from buffers Expose date_range, NaT, Timedelta from pandas Add testing utils Redirect imports to Pandas Fix imports Fix read_csv when index_col is specified Update imports from Pandas Fix bugs Use util API Fix nasty bug Add missing import Don't distribute reading of compressed files Add test utilities for Pandas tests Add test for to_csv Add warnings Fix rebase artifacts * Fix rebase artifacts * Fix bugs in read_csv indexing * Fix bugs in read_csv * Fix bug for IndexMetadata with _length 1 Remove testing imports * Rebase artifacts and formatting * Start to_csv without CSV formatter * Fix bug in _map_partitions * Initial implementation for improved to_csv * Fix bug with insert * Bugfixes * Remove CSV Formatter * Formatting * Fix python2 bug * Fix additional python2 issue --- python/ray/dataframe/__init__.py | 4 +- python/ray/dataframe/dataframe.py | 110 ++++++++++++++++++++----- python/ray/dataframe/index_metadata.py | 5 +- python/ray/dataframe/io.py | 79 ++++++++++++++---- python/ray/dataframe/utils.py | 4 +- 5 files changed, 161 insertions(+), 41 deletions(-) diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 6e91a3c66..f59805e21 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -9,7 +9,7 @@ from pandas import (eval, unique, value_counts, cut, to_numeric, factorize, test, qcut, match, Panel, date_range, Index, MultiIndex, CategoricalIndex, Series, bdate_range, DatetimeIndex, Timedelta, Timestamp, to_timedelta, set_eng_float_format, - set_option, NaT) + set_option, NaT, PeriodIndex, Categorical) import threading pd_version = pd.__version__ @@ -49,7 +49,7 @@ __all__ = [ "match", "to_datetime", "get_dummies", "Panel", "date_range", "Index", "MultiIndex", "Series", "bdate_range", "DatetimeIndex", "to_timedelta", "set_eng_float_format", "set_option", "CategoricalIndex", "Timedelta", - "Timestamp", "NaT" + "Timestamp", "NaT", "PeriodIndex", "Categorical" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a178997b0..77dd64e31 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -61,7 +61,8 @@ class DataFrame(object): data (numpy ndarray (structured or homogeneous) or dict): Dict can contain Series, arrays, constants, or list-like objects. - index (pandas.Index or list): The row index for this dataframe. + index (pandas.Index, list, ObjectID): The row index for this + dataframe. columns (pandas.Index): The column names for this dataframe, in pandas Index object. dtype: Data type to force. Only a single dtype is allowed. @@ -1845,7 +1846,8 @@ class DataFrame(object): columns = columns_copy.columns if inplace: - self._update_inplace(row_partitions=new_rows, columns=columns) + self._update_inplace(row_partitions=new_rows, columns=columns, + index=self.index) else: return DataFrame(columns=columns, row_partitions=new_rows) @@ -2330,18 +2332,31 @@ class DataFrame(object): # Deploy insert function to specific column partition, and replace that # column def insert_col_part(df): - df.insert(index_within_partition, column, value, allow_duplicates) + if isinstance(value, pd.Series) and \ + isinstance(value.dtype, + pd.core.dtypes.dtypes.DatetimeTZDtype): + # Need to set index to index of this dtype or inserted values + # become NaT + df.index = value + df.insert(index_within_partition, column, + value, allow_duplicates) + df.index = pd.RangeIndex(0, len(df)) + else: + df.insert(index_within_partition, column, + value, allow_duplicates) return df new_obj = _deploy_func.remote(insert_col_part, self._col_partitions[partition]) + new_cols = [self._col_partitions[i] if i != partition else new_obj for i in range(len(self._col_partitions))] new_col_names = self.columns.insert(loc, column) - self._update_inplace(col_partitions=new_cols, columns=new_col_names) + self._update_inplace(col_partitions=new_cols, columns=new_col_names, + index=self.index) def interpolate(self, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', downcast=None, **kwargs): @@ -3242,7 +3257,7 @@ class DataFrame(object): self._row_partitions) if inplace: - self._update_inplace(row_partitions=new_rows) + self._update_inplace(row_partitions=new_rows, index=self.index) else: return DataFrame(row_partitions=new_rows, col_metadata=self._col_metadata) @@ -4201,23 +4216,72 @@ class DataFrame(object): port_frame = to_pandas(self) port_frame.to_clipboard(excel, sep, **kwargs) - def to_csv(self, path_or_buf=None, sep=',', na_rep='', float_format=None, + def to_csv(self, path_or_buf=None, sep=",", na_rep="", float_format=None, columns=None, header=True, index=True, index_label=None, - mode='w', encoding=None, compression=None, quoting=None, - quotechar='"', line_terminator='\n', chunksize=None, + mode="w", encoding=None, compression=None, quoting=None, + quotechar='"', line_terminator="\n", chunksize=None, tupleize_cols=None, date_format=None, doublequote=True, - escapechar=None, decimal='.'): + escapechar=None, decimal="."): - warnings.warn("Defaulting to Pandas implementation", - PendingDeprecationWarning) + kwargs = dict( + path_or_buf=path_or_buf, sep=sep, na_rep=na_rep, + float_format=float_format, columns=columns, header=header, + index=index, index_label=index_label, mode=mode, + encoding=encoding, compression=compression, quoting=quoting, + quotechar=quotechar, line_terminator=line_terminator, + chunksize=chunksize, tupleize_cols=tupleize_cols, + date_format=date_format, doublequote=doublequote, + escapechar=escapechar, decimal=decimal + ) - port_frame = to_pandas(self) - port_frame.to_csv(path_or_buf, sep, na_rep, float_format, - columns, header, index, index_label, - mode, encoding, compression, quoting, - quotechar, line_terminator, chunksize, - tupleize_cols, date_format, doublequote, - escapechar, decimal) + if compression is not None: + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + return to_pandas(self).to_csv(**kwargs) + + if tupleize_cols is not None: + warnings.warn("The 'tupleize_cols' parameter is deprecated and " + "will be removed in a future version", + FutureWarning, stacklevel=2) + else: + tupleize_cols = False + + remote_kwargs_id = ray.put(dict(kwargs, path_or_buf=None)) + columns_id = ray.put(self.columns) + + def get_csv_str(df, index, columns, header, kwargs): + df.index = index + df.columns = columns + kwargs["header"] = header + return df.to_csv(**kwargs) + + idxs = [0] + np.cumsum(self._row_metadata._lengths).tolist() + idx_args = [self.index[idxs[i]:idxs[i+1]] + for i in range(len(self._row_partitions))] + csv_str_ids = _map_partitions( + get_csv_str, self._row_partitions, idx_args, + [columns_id] * len(self._row_partitions), + [header] + [False] * (len(self._row_partitions) - 1), + [remote_kwargs_id] * len(self._row_partitions)) + + if path_or_buf is None: + buf = io.StringIO() + elif isinstance(path_or_buf, str): + buf = open(path_or_buf, mode) + else: + buf = path_or_buf + + for csv_str_id in csv_str_ids: + buf.write(ray.get(csv_str_id)) + buf.flush() + + result = None + if path_or_buf is None: + result = buf.getvalue() + buf.close() + elif isinstance(path_or_buf, str): + buf.close() + return result def to_dense(self): raise NotImplementedError( @@ -4668,9 +4732,13 @@ class DataFrame(object): index=index) else: columns = self._col_metadata[key].index - indices_for_rows = \ - [i for i, item in enumerate(self.columns) - if item in set(columns)] + column_indices = {item: i for i, item in enumerate(self.columns)} + indices_for_rows = [column_indices[column] for column in columns] + + def get_columns_partition(df): + result = df.__getitem__(indices_for_rows), + result.columns = pd.RangeIndex(0, len(result.columns)) + return result new_parts = [_deploy_func.remote( lambda df: df.__getitem__(indices_for_rows), diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 11c23d885..2c3407d7a 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -232,7 +232,10 @@ class _IndexMetadata(object): # Determine which partition to place it in, and where in that partition if loc is not None: cum_lens = np.cumsum(self._lengths) - partition = np.digitize(loc, cum_lens[:-1]) + if len(cum_lens) > 1: + partition = np.digitize(loc, cum_lens[:-1], right=True) + else: + partition = 0 if partition >= len(cum_lens): if loc > cum_lens[-1]: raise IndexError("index {0} is out of bounds".format(loc)) diff --git a/python/ray/dataframe/io.py b/python/ray/dataframe/io.py index cf91dbe5d..f29505bef 100644 --- a/python/ray/dataframe/io.py +++ b/python/ray/dataframe/io.py @@ -10,6 +10,8 @@ import warnings from pyarrow.parquet import ParquetFile import pandas as pd +from pandas.io.common import _infer_compression # don't depend on internal API + from .dataframe import ray, DataFrame from . import get_npartitions @@ -82,21 +84,24 @@ def _split_df(pd_df, chunksize): # CSV -def _compute_offset(fn, npartitions): +def _compute_offset(fn, npartitions, ignore_first_line=False): """ Calculate the currect bytes offsets for a csv file. Return a list of (start, end) tuple where the end == \n or EOF. """ total_bytes = os.path.getsize(fn) - chunksize = total_bytes // npartitions + bio = open(fn, 'rb') + if ignore_first_line: + start = len(bio.readline()) + chunksize = (total_bytes - start) // npartitions + else: + start = 0 + chunksize = total_bytes // npartitions if chunksize == 0: chunksize = 1 - bio = open(fn, 'rb') - offsets = [] - start = 0 - while start <= total_bytes: + while start < total_bytes: bio.seek(chunksize, 1) # Move forward {chunksize} bytes extend_line = bio.readline() # Move after the next \n total_offset = chunksize + len(extend_line) @@ -121,15 +126,26 @@ def _infer_column(first_line, kwargs={}): @ray.remote -def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}): +def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''): + kwargs["quoting"] = int(kwargs["quoting"]) # See issue #2078 + bio = open(fn, 'rb') bio.seek(start) to_read = header + bio.read(end - start) bio.close() - return pd.read_csv(BytesIO(to_read), **kwargs) + pd_df = pd.read_csv(BytesIO(to_read), **kwargs) + index = pd_df.index + # Partitions must have RangeIndex + pd_df.index = pd.RangeIndex(0, len(pd_df)) + return pd_df, index -def read_csv(filepath, +@ray.remote +def get_index(*partition_indices): + return partition_indices[0].append(partition_indices[1:]) + + +def read_csv(filepath_or_buffer, sep=',', delimiter=None, header='infer', @@ -247,21 +263,54 @@ def read_csv(filepath, memory_map=memory_map, float_precision=float_precision) - offsets = _compute_offset(filepath, get_npartitions()) + # Default to Pandas read_csv for non-serializable objects + if not isinstance(filepath_or_buffer, str) or \ + _infer_compression(filepath_or_buffer, compression) is not None: + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + pd_obj = pd.read_csv(filepath_or_buffer, **kwargs) + if isinstance(pd_obj, pd.DataFrame): + return from_pandas(pd_obj, get_npartitions()) + + return pd_obj + + filepath = filepath_or_buffer + + # TODO: handle case where header is a list of lines first_line = _get_firstline(filepath) columns = _infer_column(first_line, kwargs=kwargs) + if header is None or (header == "infer" and names is not None): + first_line = b"" + ignore_first_line = False + else: + ignore_first_line = True + + offsets = _compute_offset(filepath, get_npartitions(), + ignore_first_line=ignore_first_line) + + # Serialize objects to speed up later use in remote tasks + first_line_id = ray.put(first_line) + kwargs_id = ray.put(kwargs) df_obj_ids = [] + index_obj_ids = [] for start, end in offsets: if start != 0: - df = _read_csv_with_offset.remote( - filepath, start, end, header=first_line, kwargs=kwargs) + df, index = _read_csv_with_offset._submit( + args=(filepath, start, end, kwargs_id, first_line_id), + num_return_vals=2) else: - df = _read_csv_with_offset.remote( - filepath, start, end, kwargs=kwargs) + df, index = _read_csv_with_offset._submit( + args=(filepath, start, end, kwargs_id), + num_return_vals=2) df_obj_ids.append(df) - return DataFrame(row_partitions=df_obj_ids, columns=columns) + index_obj_ids.append(index) + + index = get_index.remote(*index_obj_ids) if index_col is not None else None + + return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index) def read_json(path_or_buf=None, diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 26a97c2af..0e00d7b86 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -226,8 +226,8 @@ def _map_partitions(func, partitions, *argslists): for part in partitions] else: assert(all([len(args) == len(partitions) for args in argslists])) - return [_deploy_func.remote(func, part, *args) - for part, args in zip(partitions, *argslists)] + return [_deploy_func.remote(func, *args) + for args in zip(partitions, *argslists)] @ray.remote