[DataFrame] Implement df.merge (#1964)

* Begin merge implementation

* Some cleanup

* Continue cleanup

* Allowing merge on index

* Copy dataframes to clear plasma read-only error

* Make some notes, WIP

* Cleaned up code a bit, still need more error checking

* Adding error checking and addressing comments

* Addressing comment

* Adding test

* Addressing rebase artifact

* Fixing indexing bug

* Some minor cleanup
This commit is contained in:
Devin Petersohn
2018-05-01 18:40:53 -07:00
committed by Robert Nishihara
parent 06a0898af7
commit 7c1d569a49
3 changed files with 259 additions and 13 deletions
+162 -6
View File
@@ -17,6 +17,7 @@ from pandas.core.dtypes.common import (
is_numeric_dtype,
is_timedelta64_dtype)
from pandas.core.indexing import check_bool_indexer
from pandas.errors import MergeError
import warnings
import numpy as np
@@ -37,7 +38,9 @@ from .utils import (
_create_block_partitions,
_inherit_docstrings,
_reindex_helper,
_co_op_helper)
_co_op_helper,
_match_partitioning,
_concat_index)
from . import get_npartitions
from .index_metadata import _IndexMetadata
@@ -499,7 +502,7 @@ class DataFrame(object):
self._col_metadata = col_metadata
else:
assert columns is not None, \
"Columns must be passed without col_metadata"
"If col_metadata is None, columns must be passed in"
self._col_metadata = _IndexMetadata(
self._block_partitions[0, :], index=columns, axis=1)
if row_metadata is not None:
@@ -2396,9 +2399,145 @@ class DataFrame(object):
left_index=False, right_index=False, sort=False,
suffixes=('_x', '_y'), copy=True, indicator=False,
validate=None):
raise NotImplementedError(
"To contribute to Pandas on Ray, please visit "
"github.com/ray-project/ray.")
"""Database style join, where common columns in "on" are merged.
Args:
right: The DataFrame to merge against.
how: What type of join to use.
on: The common column name(s) to join on. If None, and left_on and
right_on are also None, will default to all commonly named
columns.
left_on: The column(s) on the left to use for the join.
right_on: The column(s) on the right to use for the join.
left_index: Use the index from the left as the join keys.
right_index: Use the index from the right as the join keys.
sort: Sort the join keys lexicographically in the result.
suffixes: Add this suffix to the common names not in the "on".
copy: Does nothing in our implementation
indicator: Adds a column named _merge to the DataFrame with
metadata from the merge about each row.
validate: Checks if merge is a specific type.
Returns:
A merged Dataframe
"""
if not isinstance(right, DataFrame):
raise ValueError("can not merge DataFrame with instance of type "
"{}".format(type(right)))
args = (how, on, left_on, right_on, left_index, right_index, sort,
suffixes, False, indicator, validate)
left_cols = ray.put(self.columns)
right_cols = ray.put(right.columns)
# This can be put in a remote function because we don't need it until
# the end, and the columns can be built asynchronously. This takes the
# columns defining off the critical path and speeds up the overall
# merge.
new_columns = _merge_columns.remote(left_cols, right_cols, *args)
if on is not None:
if left_on is not None or right_on is not None:
raise MergeError("Can only pass argument \"on\" OR \"left_on\""
" and \"right_on\", not a combination of "
"both.")
if not is_list_like(on):
on = [on]
if next((True for key in on if key not in self), False) or \
next((True for key in on if key not in right), False):
missing_key = \
next((str(key) for key in on if key not in self), "") + \
next((str(key) for key in on if key not in right), "")
raise KeyError(missing_key)
elif right_on is not None or right_index is True:
if left_on is None and left_index is False:
# Note: This is not the same error as pandas, but pandas throws
# a ValueError NoneType has no len(), and I don't think that
# helps enough.
raise TypeError("left_on must be specified or left_index must "
"be true if right_on is specified.")
elif left_on is not None or left_index is True:
if right_on is None and right_index is False:
# Note: See note above about TypeError.
raise TypeError("right_on must be specified or right_index "
"must be true if right_on is specified.")
if left_on is not None:
if not is_list_like(left_on):
left_on = [left_on]
if next((True for key in left_on if key not in self), False):
raise KeyError(next(key for key in left_on
if key not in self))
if right_on is not None:
if not is_list_like(right_on):
right_on = [right_on]
if next((True for key in right_on if key not in right), False):
raise KeyError(next(key for key in right_on
if key not in right))
# There's a small chance that our partitions are already perfect, but
# if it's not, we need to adjust them. We adjust the right against the
# left because the defaults of merge rely on the order of the left. We
# have to push the index down here, so if we're joining on the right's
# index we go ahead and push it down here too.
if not np.array_equal(self._row_metadata._lengths,
right._row_metadata._lengths) or right_index:
repartitioned_right = np.array([_match_partitioning._submit(
args=(df, self._row_metadata._lengths, right.index),
num_return_vals=len(self._row_metadata._lengths))
for df in right._col_partitions]).T
else:
repartitioned_right = right._block_partitions
if not left_index and not right_index:
# Passing None to each call specifies that we don't care about the
# left's index for the join.
left_idx = itertools.repeat(None)
# We only return the index if we need to update it, and that only
# happens when either left_index or right_index is True. We will
# use this value to add the return vals if we are getting an index
# back.
return_index = False
else:
# We build this to push the index down so that we can use it for
# the join.
left_idx = \
(v.index for k, v in
self._row_metadata._coord_df.copy().groupby('partition'))
return_index = True
new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([lambda x, y: x.merge(y, *args),
left_cols, right_cols,
len(self._block_partitions.T), next(left_idx)] +
np.concatenate(obj).tolist()),
num_return_vals=len(self._block_partitions.T) + return_index)
for obj in zip(self._block_partitions,
repartitioned_right)])
if not return_index:
# Default to RangeIndex if left_index and right_index both false.
new_index = None
else:
new_index_parts = new_blocks[:, -1]
new_index = _concat_index.remote(*new_index_parts)
new_blocks = new_blocks[:, :-1]
return DataFrame(block_partitions=new_blocks,
columns=new_columns,
index=new_index)
def min(self, axis=None, skipna=None, level=None, numeric_only=None,
**kwargs):
@@ -4205,7 +4344,7 @@ class DataFrame(object):
new_blocks = \
np.array([_co_op_helper._submit(
args=tuple([func, self.columns, other.columns,
len(part[0])] +
len(part[0]), None] +
np.concatenate(part).tolist()),
num_return_vals=len(part[0]))
for part in copartitions])
@@ -4260,3 +4399,20 @@ class DataFrame(object):
columns=new_column_index,
col_metadata=new_col_metadata,
row_metadata=new_row_metadata)
@ray.remote
def _merge_columns(left_columns, right_columns, *args):
"""Merge two columns to get the correct column names and order.
Args:
left_columns: The columns on the left side of the merge.
right_columns: The columns on the right side of the merge.
args: The arguments for the merge.
Returns:
The columns for the merge operation.
"""
return pd.DataFrame(columns=left_columns, index=[0], dtype='uint8').merge(
pd.DataFrame(columns=right_columns, index=[0], dtype='uint8'),
*args).columns
+50 -3
View File
@@ -2043,10 +2043,57 @@ def test_memory_usage(ray_df):
def test_merge():
ray_df = create_test_dataframe()
ray_df = rdf.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
with pytest.raises(NotImplementedError):
ray_df.merge(None)
pandas_df = pd.DataFrame({"col1": [0, 1, 2, 3], "col2": [4, 5, 6, 7],
"col3": [8, 9, 0, 1], "col4": [2, 4, 5, 6]})
ray_df2 = rdf.DataFrame({"col1": [0, 1, 2], "col2": [1, 5, 6]})
pandas_df2 = pd.DataFrame({"col1": [0, 1, 2], "col2": [1, 5, 6]})
join_types = ["outer", "inner"]
for how in join_types:
# Defaults
ray_result = ray_df.merge(ray_df2, how=how)
pandas_result = pandas_df.merge(pandas_df2, how=how)
ray_df_equals_pandas(ray_result, pandas_result)
# left_on and right_index
ray_result = ray_df.merge(ray_df2, how=how, left_on='col1',
right_index=True)
pandas_result = pandas_df.merge(pandas_df2, how=how, left_on='col1',
right_index=True)
ray_df_equals_pandas(ray_result, pandas_result)
# left_index and right_index
ray_result = ray_df.merge(ray_df2, how=how, left_index=True,
right_index=True)
pandas_result = pandas_df.merge(pandas_df2, how=how, left_index=True,
right_index=True)
ray_df_equals_pandas(ray_result, pandas_result)
# left_index and right_on
ray_result = ray_df.merge(ray_df2, how=how, left_index=True,
right_on='col1')
pandas_result = pandas_df.merge(pandas_df2, how=how, left_index=True,
right_on='col1')
ray_df_equals_pandas(ray_result, pandas_result)
# left_on and right_on col1
ray_result = ray_df.merge(ray_df2, how=how, left_on='col1',
right_on='col1')
pandas_result = pandas_df.merge(pandas_df2, how=how, left_on='col1',
right_on='col1')
ray_df_equals_pandas(ray_result, pandas_result)
# left_on and right_on col2
ray_result = ray_df.merge(ray_df2, how=how, left_on='col2',
right_on='col2')
pandas_result = pandas_df.merge(pandas_df2, how=how, left_on='col2',
right_on='col2')
ray_df_equals_pandas(ray_result, pandas_result)
@pytest.fixture
+47 -4
View File
@@ -313,7 +313,8 @@ def _reindex_helper(old_index, new_index, axis, npartitions, *df):
@ray.remote
def _co_op_helper(func, left_columns, right_columns, left_df_len, *zipped):
def _co_op_helper(func, left_columns, right_columns, left_df_len, left_idx,
*zipped):
"""Copartition operation where two DataFrames must have aligned indexes.
NOTE: This function assumes things are already copartitioned. Requires that
@@ -330,11 +331,53 @@ def _co_op_helper(func, left_columns, right_columns, left_df_len, *zipped):
Returns:
A new set of blocks for the partitioned DataFrame.
"""
left = pd.concat(zipped[:left_df_len], axis=1, copy=False)
left = pd.concat(zipped[:left_df_len], axis=1, copy=False).copy()
left.columns = left_columns
if left_idx is not None:
left.index = left_idx
right = pd.concat(zipped[left_df_len:], axis=1, copy=False)
right = pd.concat(zipped[left_df_len:], axis=1, copy=False).copy()
right.columns = right_columns
new_rows = func(left, right)
return create_blocks_helper(new_rows, left_df_len, 0)
new_blocks = create_blocks_helper(new_rows, left_df_len, 0)
if left_idx is not None:
new_blocks.append(new_rows.index)
return new_blocks
@ray.remote
def _match_partitioning(column_partition, lengths, index):
"""Match the number of rows on each partition. Used in df.merge().
Args:
column_partition: The column partition to change.
lengths: The lengths of each row partition to match to.
index: The index index of the column_partition. This is used to push
down to the inner frame for correctness in the merge.
Returns:
A list of blocks created from this column partition.
"""
partitioned_list = []
columns = column_partition.columns
# We set this because this is the only place we can guarantee correct
# placement. We use it in the case the user wants to join on the index.
column_partition.index = index
for length in lengths:
if len(column_partition) == 0:
partitioned_list.append(pd.DataFrame(columns=columns))
continue
partitioned_list.append(column_partition.iloc[:length, :])
column_partition = column_partition.iloc[length:, :]
return partitioned_list
@ray.remote
def _concat_index(*index_parts):
return index_parts[0].append(index_parts[1:])