diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 6e91a3c66..f59805e21 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -9,7 +9,7 @@ from pandas import (eval, unique, value_counts, cut, to_numeric, factorize, test, qcut, match, Panel, date_range, Index, MultiIndex, CategoricalIndex, Series, bdate_range, DatetimeIndex, Timedelta, Timestamp, to_timedelta, set_eng_float_format, - set_option, NaT) + set_option, NaT, PeriodIndex, Categorical) import threading pd_version = pd.__version__ @@ -49,7 +49,7 @@ __all__ = [ "match", "to_datetime", "get_dummies", "Panel", "date_range", "Index", "MultiIndex", "Series", "bdate_range", "DatetimeIndex", "to_timedelta", "set_eng_float_format", "set_option", "CategoricalIndex", "Timedelta", - "Timestamp", "NaT" + "Timestamp", "NaT", "PeriodIndex", "Categorical" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a178997b0..77dd64e31 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -61,7 +61,8 @@ class DataFrame(object): data (numpy ndarray (structured or homogeneous) or dict): Dict can contain Series, arrays, constants, or list-like objects. - index (pandas.Index or list): The row index for this dataframe. + index (pandas.Index, list, ObjectID): The row index for this + dataframe. columns (pandas.Index): The column names for this dataframe, in pandas Index object. dtype: Data type to force. Only a single dtype is allowed. @@ -1845,7 +1846,8 @@ class DataFrame(object): columns = columns_copy.columns if inplace: - self._update_inplace(row_partitions=new_rows, columns=columns) + self._update_inplace(row_partitions=new_rows, columns=columns, + index=self.index) else: return DataFrame(columns=columns, row_partitions=new_rows) @@ -2330,18 +2332,31 @@ class DataFrame(object): # Deploy insert function to specific column partition, and replace that # column def insert_col_part(df): - df.insert(index_within_partition, column, value, allow_duplicates) + if isinstance(value, pd.Series) and \ + isinstance(value.dtype, + pd.core.dtypes.dtypes.DatetimeTZDtype): + # Need to set index to index of this dtype or inserted values + # become NaT + df.index = value + df.insert(index_within_partition, column, + value, allow_duplicates) + df.index = pd.RangeIndex(0, len(df)) + else: + df.insert(index_within_partition, column, + value, allow_duplicates) return df new_obj = _deploy_func.remote(insert_col_part, self._col_partitions[partition]) + new_cols = [self._col_partitions[i] if i != partition else new_obj for i in range(len(self._col_partitions))] new_col_names = self.columns.insert(loc, column) - self._update_inplace(col_partitions=new_cols, columns=new_col_names) + self._update_inplace(col_partitions=new_cols, columns=new_col_names, + index=self.index) def interpolate(self, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', downcast=None, **kwargs): @@ -3242,7 +3257,7 @@ class DataFrame(object): self._row_partitions) if inplace: - self._update_inplace(row_partitions=new_rows) + self._update_inplace(row_partitions=new_rows, index=self.index) else: return DataFrame(row_partitions=new_rows, col_metadata=self._col_metadata) @@ -4201,23 +4216,72 @@ class DataFrame(object): port_frame = to_pandas(self) port_frame.to_clipboard(excel, sep, **kwargs) - def to_csv(self, path_or_buf=None, sep=',', na_rep='', float_format=None, + def to_csv(self, path_or_buf=None, sep=",", na_rep="", float_format=None, columns=None, header=True, index=True, index_label=None, - mode='w', encoding=None, compression=None, quoting=None, - quotechar='"', line_terminator='\n', chunksize=None, + mode="w", encoding=None, compression=None, quoting=None, + quotechar='"', line_terminator="\n", chunksize=None, tupleize_cols=None, date_format=None, doublequote=True, - escapechar=None, decimal='.'): + escapechar=None, decimal="."): - warnings.warn("Defaulting to Pandas implementation", - PendingDeprecationWarning) + kwargs = dict( + path_or_buf=path_or_buf, sep=sep, na_rep=na_rep, + float_format=float_format, columns=columns, header=header, + index=index, index_label=index_label, mode=mode, + encoding=encoding, compression=compression, quoting=quoting, + quotechar=quotechar, line_terminator=line_terminator, + chunksize=chunksize, tupleize_cols=tupleize_cols, + date_format=date_format, doublequote=doublequote, + escapechar=escapechar, decimal=decimal + ) - port_frame = to_pandas(self) - port_frame.to_csv(path_or_buf, sep, na_rep, float_format, - columns, header, index, index_label, - mode, encoding, compression, quoting, - quotechar, line_terminator, chunksize, - tupleize_cols, date_format, doublequote, - escapechar, decimal) + if compression is not None: + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + return to_pandas(self).to_csv(**kwargs) + + if tupleize_cols is not None: + warnings.warn("The 'tupleize_cols' parameter is deprecated and " + "will be removed in a future version", + FutureWarning, stacklevel=2) + else: + tupleize_cols = False + + remote_kwargs_id = ray.put(dict(kwargs, path_or_buf=None)) + columns_id = ray.put(self.columns) + + def get_csv_str(df, index, columns, header, kwargs): + df.index = index + df.columns = columns + kwargs["header"] = header + return df.to_csv(**kwargs) + + idxs = [0] + np.cumsum(self._row_metadata._lengths).tolist() + idx_args = [self.index[idxs[i]:idxs[i+1]] + for i in range(len(self._row_partitions))] + csv_str_ids = _map_partitions( + get_csv_str, self._row_partitions, idx_args, + [columns_id] * len(self._row_partitions), + [header] + [False] * (len(self._row_partitions) - 1), + [remote_kwargs_id] * len(self._row_partitions)) + + if path_or_buf is None: + buf = io.StringIO() + elif isinstance(path_or_buf, str): + buf = open(path_or_buf, mode) + else: + buf = path_or_buf + + for csv_str_id in csv_str_ids: + buf.write(ray.get(csv_str_id)) + buf.flush() + + result = None + if path_or_buf is None: + result = buf.getvalue() + buf.close() + elif isinstance(path_or_buf, str): + buf.close() + return result def to_dense(self): raise NotImplementedError( @@ -4668,9 +4732,13 @@ class DataFrame(object): index=index) else: columns = self._col_metadata[key].index - indices_for_rows = \ - [i for i, item in enumerate(self.columns) - if item in set(columns)] + column_indices = {item: i for i, item in enumerate(self.columns)} + indices_for_rows = [column_indices[column] for column in columns] + + def get_columns_partition(df): + result = df.__getitem__(indices_for_rows), + result.columns = pd.RangeIndex(0, len(result.columns)) + return result new_parts = [_deploy_func.remote( lambda df: df.__getitem__(indices_for_rows), diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 11c23d885..2c3407d7a 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -232,7 +232,10 @@ class _IndexMetadata(object): # Determine which partition to place it in, and where in that partition if loc is not None: cum_lens = np.cumsum(self._lengths) - partition = np.digitize(loc, cum_lens[:-1]) + if len(cum_lens) > 1: + partition = np.digitize(loc, cum_lens[:-1], right=True) + else: + partition = 0 if partition >= len(cum_lens): if loc > cum_lens[-1]: raise IndexError("index {0} is out of bounds".format(loc)) diff --git a/python/ray/dataframe/io.py b/python/ray/dataframe/io.py index cf91dbe5d..f29505bef 100644 --- a/python/ray/dataframe/io.py +++ b/python/ray/dataframe/io.py @@ -10,6 +10,8 @@ import warnings from pyarrow.parquet import ParquetFile import pandas as pd +from pandas.io.common import _infer_compression # don't depend on internal API + from .dataframe import ray, DataFrame from . import get_npartitions @@ -82,21 +84,24 @@ def _split_df(pd_df, chunksize): # CSV -def _compute_offset(fn, npartitions): +def _compute_offset(fn, npartitions, ignore_first_line=False): """ Calculate the currect bytes offsets for a csv file. Return a list of (start, end) tuple where the end == \n or EOF. """ total_bytes = os.path.getsize(fn) - chunksize = total_bytes // npartitions + bio = open(fn, 'rb') + if ignore_first_line: + start = len(bio.readline()) + chunksize = (total_bytes - start) // npartitions + else: + start = 0 + chunksize = total_bytes // npartitions if chunksize == 0: chunksize = 1 - bio = open(fn, 'rb') - offsets = [] - start = 0 - while start <= total_bytes: + while start < total_bytes: bio.seek(chunksize, 1) # Move forward {chunksize} bytes extend_line = bio.readline() # Move after the next \n total_offset = chunksize + len(extend_line) @@ -121,15 +126,26 @@ def _infer_column(first_line, kwargs={}): @ray.remote -def _read_csv_with_offset(fn, start, end, header=b'', kwargs={}): +def _read_csv_with_offset(fn, start, end, kwargs={}, header=b''): + kwargs["quoting"] = int(kwargs["quoting"]) # See issue #2078 + bio = open(fn, 'rb') bio.seek(start) to_read = header + bio.read(end - start) bio.close() - return pd.read_csv(BytesIO(to_read), **kwargs) + pd_df = pd.read_csv(BytesIO(to_read), **kwargs) + index = pd_df.index + # Partitions must have RangeIndex + pd_df.index = pd.RangeIndex(0, len(pd_df)) + return pd_df, index -def read_csv(filepath, +@ray.remote +def get_index(*partition_indices): + return partition_indices[0].append(partition_indices[1:]) + + +def read_csv(filepath_or_buffer, sep=',', delimiter=None, header='infer', @@ -247,21 +263,54 @@ def read_csv(filepath, memory_map=memory_map, float_precision=float_precision) - offsets = _compute_offset(filepath, get_npartitions()) + # Default to Pandas read_csv for non-serializable objects + if not isinstance(filepath_or_buffer, str) or \ + _infer_compression(filepath_or_buffer, compression) is not None: + warnings.warn("Defaulting to Pandas implementation", + PendingDeprecationWarning) + + pd_obj = pd.read_csv(filepath_or_buffer, **kwargs) + if isinstance(pd_obj, pd.DataFrame): + return from_pandas(pd_obj, get_npartitions()) + + return pd_obj + + filepath = filepath_or_buffer + + # TODO: handle case where header is a list of lines first_line = _get_firstline(filepath) columns = _infer_column(first_line, kwargs=kwargs) + if header is None or (header == "infer" and names is not None): + first_line = b"" + ignore_first_line = False + else: + ignore_first_line = True + + offsets = _compute_offset(filepath, get_npartitions(), + ignore_first_line=ignore_first_line) + + # Serialize objects to speed up later use in remote tasks + first_line_id = ray.put(first_line) + kwargs_id = ray.put(kwargs) df_obj_ids = [] + index_obj_ids = [] for start, end in offsets: if start != 0: - df = _read_csv_with_offset.remote( - filepath, start, end, header=first_line, kwargs=kwargs) + df, index = _read_csv_with_offset._submit( + args=(filepath, start, end, kwargs_id, first_line_id), + num_return_vals=2) else: - df = _read_csv_with_offset.remote( - filepath, start, end, kwargs=kwargs) + df, index = _read_csv_with_offset._submit( + args=(filepath, start, end, kwargs_id), + num_return_vals=2) df_obj_ids.append(df) - return DataFrame(row_partitions=df_obj_ids, columns=columns) + index_obj_ids.append(index) + + index = get_index.remote(*index_obj_ids) if index_col is not None else None + + return DataFrame(row_partitions=df_obj_ids, columns=columns, index=index) def read_json(path_or_buf=None, diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 26a97c2af..0e00d7b86 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -226,8 +226,8 @@ def _map_partitions(func, partitions, *argslists): for part in partitions] else: assert(all([len(args) == len(partitions) for args in argslists])) - return [_deploy_func.remote(func, part, *args) - for part, args in zip(partitions, *argslists)] + return [_deploy_func.remote(func, *args) + for args in zip(partitions, *argslists)] @ray.remote