diff --git a/python/ray/dataframe/__init__.py b/python/ray/dataframe/__init__.py index 5fa837db7..5081df3d3 100644 --- a/python/ray/dataframe/__init__.py +++ b/python/ray/dataframe/__init__.py @@ -3,6 +3,7 @@ from __future__ import division from __future__ import print_function import pandas as pd +from pandas import eval import threading pd_version = pd.__version__ @@ -33,7 +34,7 @@ from .io import (read_csv, read_parquet) # noqa: 402 from .concat import concat # noqa: 402 __all__ = [ - "DataFrame", "Series", "read_csv", "read_parquet", "concat" + "DataFrame", "Series", "read_csv", "read_parquet", "concat", "eval" ] try: diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index 8a51b5c6e..830a7b734 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -349,6 +349,24 @@ class DataFrame(object): result_series.index = self.index return result_series + def _validate_eval_query(self, expr, **kwargs): + """Helper function to check the arguments to eval() and query() + + Args: + expr: The expression to evaluate. This string cannot contain any + Python statements, only Python expressions. + """ + if isinstance(expr, str) and expr is '': + raise ValueError("expr cannot be an empty string") + + if isinstance(expr, str) and '@' in expr: + raise NotImplementedError("Local variables not yet supported in " + "eval.") + + if isinstance(expr, str) and 'not' in expr: + if 'parser' in kwargs and kwargs['parser'] == 'python': + raise NotImplementedError("'Not' nodes are not implemented.") + @property def size(self): """Get the number of elements in the DataFrame. @@ -1253,20 +1271,29 @@ class DataFrame(object): Returns: ndarray, numeric scalar, DataFrame, Series """ + self._validate_eval_query(expr, **kwargs) + columns = self.columns def eval_helper(df): - df = df.copy() df.columns = columns - df.eval(expr, inplace=True, **kwargs) - df.columns = pd.RangeIndex(0, len(df.columns)) - return df + result = df.eval(expr, inplace=False, **kwargs) + # If result is a series, expr was not an assignment expression. + if not isinstance(result, pd.Series): + result.columns = pd.RangeIndex(0, len(result.columns)) + return result inplace = validate_bool_kwarg(inplace, "inplace") new_rows = _map_partitions(eval_helper, self._row_partitions) - # TODO: This doesn't work if the expression is not an assignment - columns_copy = self._col_metadata._coord_df.T.copy() + result_type = ray.get(_deploy_func.remote(lambda df: type(df), + new_rows[0])) + if result_type is pd.Series: + new_series = pd.concat(ray.get(new_rows), axis=0) + new_series.index = self.index + return new_series + + columns_copy = self._col_metadata._coord_df.copy().T columns_copy.eval(expr, inplace=True, **kwargs) columns = columns_copy.columns @@ -2162,9 +2189,8 @@ class DataFrame(object): Returns: A new DataFrame if inplace=False """ - if '@' in expr: - raise NotImplementedError("Local variables not yet supported in " - "query.") + self._validate_eval_query(expr, **kwargs) + columns = self.columns def query_helper(df):