diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index b96c4c836..9cbc98ca3 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -17,6 +17,7 @@ from pandas.core.dtypes.common import ( is_numeric_dtype, is_timedelta64_dtype) from pandas.core.indexing import check_bool_indexer +from pandas.errors import MergeError import warnings import numpy as np @@ -37,7 +38,9 @@ from .utils import ( _create_block_partitions, _inherit_docstrings, _reindex_helper, - _co_op_helper) + _co_op_helper, + _match_partitioning, + _concat_index) from . import get_npartitions from .index_metadata import _IndexMetadata @@ -499,7 +502,7 @@ class DataFrame(object): self._col_metadata = col_metadata else: assert columns is not None, \ - "Columns must be passed without col_metadata" + "If col_metadata is None, columns must be passed in" self._col_metadata = _IndexMetadata( self._block_partitions[0, :], index=columns, axis=1) if row_metadata is not None: @@ -2396,9 +2399,145 @@ class DataFrame(object): left_index=False, right_index=False, sort=False, suffixes=('_x', '_y'), copy=True, indicator=False, validate=None): - raise NotImplementedError( - "To contribute to Pandas on Ray, please visit " - "github.com/ray-project/ray.") + """Database style join, where common columns in "on" are merged. + + Args: + right: The DataFrame to merge against. + how: What type of join to use. + on: The common column name(s) to join on. If None, and left_on and + right_on are also None, will default to all commonly named + columns. + left_on: The column(s) on the left to use for the join. + right_on: The column(s) on the right to use for the join. + left_index: Use the index from the left as the join keys. + right_index: Use the index from the right as the join keys. + sort: Sort the join keys lexicographically in the result. + suffixes: Add this suffix to the common names not in the "on". + copy: Does nothing in our implementation + indicator: Adds a column named _merge to the DataFrame with + metadata from the merge about each row. + validate: Checks if merge is a specific type. + + Returns: + A merged Dataframe + """ + + if not isinstance(right, DataFrame): + raise ValueError("can not merge DataFrame with instance of type " + "{}".format(type(right))) + + args = (how, on, left_on, right_on, left_index, right_index, sort, + suffixes, False, indicator, validate) + + left_cols = ray.put(self.columns) + right_cols = ray.put(right.columns) + + # This can be put in a remote function because we don't need it until + # the end, and the columns can be built asynchronously. This takes the + # columns defining off the critical path and speeds up the overall + # merge. + new_columns = _merge_columns.remote(left_cols, right_cols, *args) + + if on is not None: + if left_on is not None or right_on is not None: + raise MergeError("Can only pass argument \"on\" OR \"left_on\"" + " and \"right_on\", not a combination of " + "both.") + if not is_list_like(on): + on = [on] + + if next((True for key in on if key not in self), False) or \ + next((True for key in on if key not in right), False): + + missing_key = \ + next((str(key) for key in on if key not in self), "") + \ + next((str(key) for key in on if key not in right), "") + raise KeyError(missing_key) + + elif right_on is not None or right_index is True: + if left_on is None and left_index is False: + # Note: This is not the same error as pandas, but pandas throws + # a ValueError NoneType has no len(), and I don't think that + # helps enough. + raise TypeError("left_on must be specified or left_index must " + "be true if right_on is specified.") + + elif left_on is not None or left_index is True: + if right_on is None and right_index is False: + # Note: See note above about TypeError. + raise TypeError("right_on must be specified or right_index " + "must be true if right_on is specified.") + + if left_on is not None: + if not is_list_like(left_on): + left_on = [left_on] + + if next((True for key in left_on if key not in self), False): + raise KeyError(next(key for key in left_on + if key not in self)) + + if right_on is not None: + if not is_list_like(right_on): + right_on = [right_on] + + if next((True for key in right_on if key not in right), False): + raise KeyError(next(key for key in right_on + if key not in right)) + + # There's a small chance that our partitions are already perfect, but + # if it's not, we need to adjust them. We adjust the right against the + # left because the defaults of merge rely on the order of the left. We + # have to push the index down here, so if we're joining on the right's + # index we go ahead and push it down here too. + if not np.array_equal(self._row_metadata._lengths, + right._row_metadata._lengths) or right_index: + + repartitioned_right = np.array([_match_partitioning._submit( + args=(df, self._row_metadata._lengths, right.index), + num_return_vals=len(self._row_metadata._lengths)) + for df in right._col_partitions]).T + else: + repartitioned_right = right._block_partitions + + if not left_index and not right_index: + # Passing None to each call specifies that we don't care about the + # left's index for the join. + left_idx = itertools.repeat(None) + + # We only return the index if we need to update it, and that only + # happens when either left_index or right_index is True. We will + # use this value to add the return vals if we are getting an index + # back. + return_index = False + else: + # We build this to push the index down so that we can use it for + # the join. + left_idx = \ + (v.index for k, v in + self._row_metadata._coord_df.copy().groupby('partition')) + return_index = True + + new_blocks = \ + np.array([_co_op_helper._submit( + args=tuple([lambda x, y: x.merge(y, *args), + left_cols, right_cols, + len(self._block_partitions.T), next(left_idx)] + + np.concatenate(obj).tolist()), + num_return_vals=len(self._block_partitions.T) + return_index) + for obj in zip(self._block_partitions, + repartitioned_right)]) + + if not return_index: + # Default to RangeIndex if left_index and right_index both false. + new_index = None + else: + new_index_parts = new_blocks[:, -1] + new_index = _concat_index.remote(*new_index_parts) + new_blocks = new_blocks[:, :-1] + + return DataFrame(block_partitions=new_blocks, + columns=new_columns, + index=new_index) def min(self, axis=None, skipna=None, level=None, numeric_only=None, **kwargs): @@ -4205,7 +4344,7 @@ class DataFrame(object): new_blocks = \ np.array([_co_op_helper._submit( args=tuple([func, self.columns, other.columns, - len(part[0])] + + len(part[0]), None] + np.concatenate(part).tolist()), num_return_vals=len(part[0])) for part in copartitions]) @@ -4260,3 +4399,20 @@ class DataFrame(object): columns=new_column_index, col_metadata=new_col_metadata, row_metadata=new_row_metadata) + + +@ray.remote +def _merge_columns(left_columns, right_columns, *args): + """Merge two columns to get the correct column names and order. + + Args: + left_columns: The columns on the left side of the merge. + right_columns: The columns on the right side of the merge. + args: The arguments for the merge. + + Returns: + The columns for the merge operation. + """ + return pd.DataFrame(columns=left_columns, index=[0], dtype='uint8').merge( + pd.DataFrame(columns=right_columns, index=[0], dtype='uint8'), + *args).columns diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 60d2862d9..beea6078f 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -2043,10 +2043,57 @@ def test_memory_usage(ray_df): def test_merge(): - ray_df = create_test_dataframe() + ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) - with pytest.raises(NotImplementedError): - ray_df.merge(None) + pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7], + "col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]}) + + ray_df2 = rdf.DataFrame({"col1": [0, 1, 2], "col2": [1, 5, 6]}) + + pandas_df2 = pd.DataFrame({"col1": [0, 1, 2], "col2": [1, 5, 6]}) + + join_types = ["outer", "inner"] + for how in join_types: + # Defaults + ray_result = ray_df.merge(ray_df2, how=how) + pandas_result = pandas_df.merge(pandas_df2, how=how) + ray_df_equals_pandas(ray_result, pandas_result) + + # left_on and right_index + ray_result = ray_df.merge(ray_df2, how=how, left_on='col1', + right_index=True) + pandas_result = pandas_df.merge(pandas_df2, how=how, left_on='col1', + right_index=True) + ray_df_equals_pandas(ray_result, pandas_result) + + # left_index and right_index + ray_result = ray_df.merge(ray_df2, how=how, left_index=True, + right_index=True) + pandas_result = pandas_df.merge(pandas_df2, how=how, left_index=True, + right_index=True) + ray_df_equals_pandas(ray_result, pandas_result) + + # left_index and right_on + ray_result = ray_df.merge(ray_df2, how=how, left_index=True, + right_on='col1') + pandas_result = pandas_df.merge(pandas_df2, how=how, left_index=True, + right_on='col1') + ray_df_equals_pandas(ray_result, pandas_result) + + # left_on and right_on col1 + ray_result = ray_df.merge(ray_df2, how=how, left_on='col1', + right_on='col1') + pandas_result = pandas_df.merge(pandas_df2, how=how, left_on='col1', + right_on='col1') + ray_df_equals_pandas(ray_result, pandas_result) + + # left_on and right_on col2 + ray_result = ray_df.merge(ray_df2, how=how, left_on='col2', + right_on='col2') + pandas_result = pandas_df.merge(pandas_df2, how=how, left_on='col2', + right_on='col2') + ray_df_equals_pandas(ray_result, pandas_result) @pytest.fixture diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 97c166d09..e91dfaf40 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -313,7 +313,8 @@ def _reindex_helper(old_index, new_index, axis, npartitions, *df): @ray.remote -def _co_op_helper(func, left_columns, right_columns, left_df_len, *zipped): +def _co_op_helper(func, left_columns, right_columns, left_df_len, left_idx, + *zipped): """Copartition operation where two DataFrames must have aligned indexes. NOTE: This function assumes things are already copartitioned. Requires that @@ -330,11 +331,53 @@ def _co_op_helper(func, left_columns, right_columns, left_df_len, *zipped): Returns: A new set of blocks for the partitioned DataFrame. """ - left = pd.concat(zipped[:left_df_len], axis=1, copy=False) + left = pd.concat(zipped[:left_df_len], axis=1, copy=False).copy() left.columns = left_columns + if left_idx is not None: + left.index = left_idx - right = pd.concat(zipped[left_df_len:], axis=1, copy=False) + right = pd.concat(zipped[left_df_len:], axis=1, copy=False).copy() right.columns = right_columns new_rows = func(left, right) - return create_blocks_helper(new_rows, left_df_len, 0) + + new_blocks = create_blocks_helper(new_rows, left_df_len, 0) + + if left_idx is not None: + new_blocks.append(new_rows.index) + + return new_blocks + + +@ray.remote +def _match_partitioning(column_partition, lengths, index): + """Match the number of rows on each partition. Used in df.merge(). + + Args: + column_partition: The column partition to change. + lengths: The lengths of each row partition to match to. + index: The index index of the column_partition. This is used to push + down to the inner frame for correctness in the merge. + + Returns: + A list of blocks created from this column partition. + """ + partitioned_list = [] + + columns = column_partition.columns + # We set this because this is the only place we can guarantee correct + # placement. We use it in the case the user wants to join on the index. + column_partition.index = index + for length in lengths: + if len(column_partition) == 0: + partitioned_list.append(pd.DataFrame(columns=columns)) + continue + + partitioned_list.append(column_partition.iloc[:length, :]) + column_partition = column_partition.iloc[length:, :] + return partitioned_list + + +@ray.remote +def _concat_index(*index_parts): + return index_parts[0].append(index_parts[1:])