diff --git a/python/ray/worker.py b/python/ray/worker.py index 97e67ed57..ded6fada6 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1080,6 +1080,46 @@ def _initialize_serialization(worker=global_worker): custom_serializer=default_dict_custom_serializer, custom_deserializer=default_dict_custom_deserializer) + def _serialize_pandas_series(s): + import pandas as pd + # TODO: serializing Series without extra copy + serialized = pyarrow.serialize_pandas(pd.DataFrame({s.name: s})) + return { + 'type': 'Series', + 'data': serialized.to_pybytes() + } + + def _serialize_pandas_dataframe(df): + return { + 'type': 'DataFrame', + 'data': pyarrow.serialize_pandas(df).to_pybytes() + } + + def _deserialize_callback_pandas(data): + deserialized = pyarrow.deserialize_pandas(data['data']) + type_ = data['type'] + if type_ == 'Series': + return deserialized[deserialized.columns[0]] + elif type_ == 'DataFrame': + return deserialized + else: + raise ValueError(type_) + + try: + import pandas as pd + worker.serialization_context.register_type( + pd.Series, 'pandas.Series', + custom_serializer=_serialize_pandas_series, + custom_deserializer=_deserialize_callback_pandas) + + worker.serialization_context.register_type( + pd.DataFrame, 'pandas.DataFrame', + custom_serializer=_serialize_pandas_dataframe, + custom_deserializer=_deserialize_callback_pandas) + except ImportError: + # no pandas + pass + if worker.mode in [SCRIPT_MODE, SILENT_MODE]: # These should only be called on the driver because _register_class # will export the class to all of the workers. diff --git a/src/thirdparty/download_thirdparty.sh b/src/thirdparty/download_thirdparty.sh index f3f52ca36..121599402 100755 --- a/src/thirdparty/download_thirdparty.sh +++ b/src/thirdparty/download_thirdparty.sh @@ -13,4 +13,4 @@ fi cd $TP_DIR/arrow git fetch origin master -git checkout 988338c544580ffd367a5540f1061dd7b0fccc0e +git checkout ee78cdcb1c475a05df9cd9de63358e80ba280a63