mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:53:14 +08:00
[DataFrame] Improve performance of iteration methods (#2026)
* fix iterrows * make iteration methods performant * resolving comments * remove indexing from iterator * switch to iterator syntax
This commit is contained in:
committed by
Devin Petersohn
parent
ae17ebd032
commit
afbb260ca4
@@ -46,6 +46,7 @@ from .utils import (
|
||||
_correct_column_dtypes)
|
||||
from . import get_npartitions
|
||||
from .index_metadata import _IndexMetadata
|
||||
from .iterator import PartitionIterator
|
||||
|
||||
|
||||
@_inherit_docstrings(pd.DataFrame)
|
||||
@@ -2368,50 +2369,53 @@ class DataFrame(object):
|
||||
"""Iterate over DataFrame rows as (index, Series) pairs.
|
||||
|
||||
Note:
|
||||
Generators can't be pickeled so from the remote function
|
||||
Generators can't be pickled so from the remote function
|
||||
we expand the generator into a list before getting it.
|
||||
This is not that ideal.
|
||||
|
||||
Returns:
|
||||
A generator that iterates over the rows of the frame.
|
||||
"""
|
||||
def update_iterrow(series, i):
|
||||
"""Helper function to correct the columns + name of the Series."""
|
||||
series.index = self.columns
|
||||
series.name = list(self.index)[i]
|
||||
return series
|
||||
index_iter = (self._row_metadata.partition_series(i).index
|
||||
for i in range(len(self._row_partitions)))
|
||||
|
||||
iters = ray.get([_deploy_func.remote(
|
||||
lambda df: list(df.iterrows()), part)
|
||||
for part in self._row_partitions])
|
||||
iters = itertools.chain.from_iterable(iters)
|
||||
series = map(lambda s: update_iterrow(s[1][1], s[0]), enumerate(iters))
|
||||
def iterrow_helper(part):
|
||||
df = ray.get(part)
|
||||
df.columns = self.columns
|
||||
df.index = next(index_iter)
|
||||
return df.iterrows()
|
||||
|
||||
return zip(self.index, series)
|
||||
partition_iterator = PartitionIterator(self._row_partitions,
|
||||
iterrow_helper)
|
||||
|
||||
for v in partition_iterator:
|
||||
yield v
|
||||
|
||||
def items(self):
|
||||
"""Iterator over (column name, Series) pairs.
|
||||
|
||||
Note:
|
||||
Generators can't be pickeled so from the remote function
|
||||
Generators can't be pickled so from the remote function
|
||||
we expand the generator into a list before getting it.
|
||||
This is not that ideal.
|
||||
|
||||
Returns:
|
||||
A generator that iterates over the columns of the frame.
|
||||
"""
|
||||
iters = ray.get([_deploy_func.remote(
|
||||
lambda df: list(df.items()), part)
|
||||
for part in self._row_partitions])
|
||||
col_iter = (self._col_metadata.partition_series(i).index
|
||||
for i in range(len(self._col_partitions)))
|
||||
|
||||
def concat_iters(iterables):
|
||||
for partitions in enumerate(zip(*iterables)):
|
||||
series = pd.concat([_series for _, _series in partitions[1]])
|
||||
series.index = self.index
|
||||
series.name = list(self.columns)[partitions[0]]
|
||||
yield (series.name, series)
|
||||
def items_helper(part):
|
||||
df = ray.get(part)
|
||||
df.columns = next(col_iter)
|
||||
df.index = self.index
|
||||
return df.items()
|
||||
|
||||
return concat_iters(iters)
|
||||
partition_iterator = PartitionIterator(self._col_partitions,
|
||||
items_helper)
|
||||
|
||||
for v in partition_iterator:
|
||||
yield v
|
||||
|
||||
def iteritems(self):
|
||||
"""Iterator over (column name, Series) pairs.
|
||||
@@ -2433,31 +2437,27 @@ class DataFrame(object):
|
||||
name (string, default "Pandas"): The name of the returned
|
||||
namedtuples or None to return regular tuples.
|
||||
Note:
|
||||
Generators can't be pickeled so from the remote function
|
||||
Generators can't be pickled so from the remote function
|
||||
we expand the generator into a list before getting it.
|
||||
This is not that ideal.
|
||||
|
||||
Returns:
|
||||
A tuple representing row data. See args for varying tuples.
|
||||
"""
|
||||
iters = ray.get([
|
||||
_deploy_func.remote(
|
||||
lambda df: list(df.itertuples(index=index, name=name)),
|
||||
part) for part in self._row_partitions])
|
||||
iters = itertools.chain.from_iterable(iters)
|
||||
index_iter = (self._row_metadata.partition_series(i).index
|
||||
for i in range(len(self._row_partitions)))
|
||||
|
||||
def _replace_index(row_tuple, idx):
|
||||
# We need to use try-except here because
|
||||
# isinstance(row_tuple, namedtuple) won't work.
|
||||
try:
|
||||
row_tuple = row_tuple._replace(Index=idx)
|
||||
except AttributeError: # Tuple not namedtuple
|
||||
row_tuple = (idx,) + row_tuple[1:]
|
||||
return row_tuple
|
||||
def itertuples_helper(part):
|
||||
df = ray.get(part)
|
||||
df.columns = self.columns
|
||||
df.index = next(index_iter)
|
||||
return df.itertuples(index=index, name=name)
|
||||
|
||||
if index:
|
||||
iters = itertools.starmap(_replace_index, zip(iters, self.index))
|
||||
return iters
|
||||
partition_iterator = PartitionIterator(self._row_partitions,
|
||||
itertuples_helper)
|
||||
|
||||
for v in partition_iterator:
|
||||
yield v
|
||||
|
||||
def join(self, other, on=None, how='left', lsuffix='', rsuffix='',
|
||||
sort=False):
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
from collections import Iterator
|
||||
|
||||
|
||||
class PartitionIterator(Iterator):
|
||||
def __init__(self, partitions, func):
|
||||
"""PartitionIterator class to define a generator on partitioned data
|
||||
|
||||
Args:
|
||||
partitions ([ObjectID]): Partitions to iterate over
|
||||
func (callable): The function to get inner iterables from
|
||||
each partition
|
||||
"""
|
||||
self.partitions = iter(partitions)
|
||||
self.func = func
|
||||
self.iter_cache = iter([])
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
return self.next()
|
||||
|
||||
def next(self):
|
||||
try:
|
||||
return next(self.iter_cache)
|
||||
except StopIteration:
|
||||
next_partition = next(self.partitions)
|
||||
self.iter_cache = self.func(next_partition)
|
||||
return self.next()
|
||||
Reference in New Issue
Block a user