From 42cbf801e1922c65d1bc446e1b7513ce4db0b7a9 Mon Sep 17 00:00:00 2001 From: "Siyuan (Ryans) Zhuang" Date: Mon, 3 Feb 2020 16:08:18 -0500 Subject: [PATCH] workaround for python3.5 fast numpy serialization (#6675) --- build.sh | 6 +- python/ray/_raylet.pyx | 5 +- python/ray/cloudpickle/cloudpickle_fast.py | 49 +++ python/ray/serialization.py | 330 ++++----------------- python/ray/worker.py | 1 - 5 files changed, 110 insertions(+), 281 deletions(-) diff --git a/build.sh b/build.sh index 97c16b22a..e710d0510 100755 --- a/build.sh +++ b/build.sh @@ -103,12 +103,12 @@ if [ -z "$SKIP_PYARROW_INSTALL" ]; then fi PYTHON_VERSION=`"$PYTHON_EXECUTABLE" -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}".format(*version))'` -if [[ "$PYTHON_VERSION" == "3.6" || "$PYTHON_VERSION" == "3.7" ]]; then +if [[ "$PYTHON_VERSION" == "3.5" || "$PYTHON_VERSION" == "3.6" || "$PYTHON_VERSION" == "3.7" ]]; then WORK_DIR=`mktemp -d` pushd $WORK_DIR - git clone https://github.com/pitrou/pickle5-backport + git clone https://github.com/suquark/pickle5-backport pushd pickle5-backport - git checkout 5186f9ca4ce55ae530027db196da51e08208a16b + git checkout 43551fbb9add8ac2e8551b96fdaf2fe5a3b5997d "$PYTHON_EXECUTABLE" setup.py bdist_wheel unzip -o dist/*.whl -d "$ROOT_DIR/python/ray/pickle5_files" popd diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 48bd4063f..0c4d7f01e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -580,10 +580,7 @@ cdef write_serialized_object( (serialized_object.writer).write_to( serialized_object.inband, buf, MEMCOPY_THREADS) else: - buffer = Buffer.make(buf) - stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer)) - stream.set_memcopy_threads(MEMCOPY_THREADS) - serialized_object.serialized_object.write_to(stream) + raise TypeError("Unsupported serialization type.") cdef class CoreWorker: diff --git a/python/ray/cloudpickle/cloudpickle_fast.py b/python/ray/cloudpickle/cloudpickle_fast.py index c26f7610d..465c7d215 100644 --- a/python/ray/cloudpickle/cloudpickle_fast.py +++ b/python/ray/cloudpickle/cloudpickle_fast.py @@ -20,6 +20,8 @@ import sys import types import weakref +import numpy + from .cloudpickle import ( _is_dynamic, _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, _find_imported_submodules, _get_cell_contents, _is_global, _builtin_type, @@ -407,6 +409,44 @@ def _property_reduce(obj): return property, (obj.fget, obj.fset, obj.fdel, obj.__doc__) +def _numpy_ndarray_reduce(array): + # This function is implemented according to 'array_reduce_ex_picklebuffer' + # in numpy C backend. This is a workaround for python3.5 pickling support. + if sys.version_info >= (3, 8): + import pickle + picklebuf_class = pickle.PickleBuffer + elif sys.version_info >= (3, 5): + try: + import pickle5 + picklebuf_class = pickle5.PickleBuffer + except Exception: + raise ImportError("Using pickle protocol 5 requires the pickle5 " + "module for Python >=3.5 and <3.8") + else: + raise ValueError("pickle protocol 5 is not available for Python < 3.5") + # if the array if Fortran-contiguous and not C-contiguous, + # the PickleBuffer instance will hold a view on the transpose + # of the initial array, that is C-contiguous. + if not array.flags.c_contiguous and array.flags.f_contiguous: + order = 'F' + picklebuf_args = array.transpose() + else: + order = 'C' + picklebuf_args = array + try: + buffer = picklebuf_class(picklebuf_args) + except Exception: + # Some arrays may refuse to export a buffer, in which case + # just fall back on regular __reduce_ex__ implementation + # (gh-12745). + return array.__reduce__() + + # Get the _frombuffer() function for reconstruction + import numpy.core.numeric as numeric_mod + from_buffer_func = numeric_mod._frombuffer + return from_buffer_func, (buffer, array.dtype, array.shape, order) + + class CloudPickler(Pickler): """Fast C Pickler extension with additional reducing routines. @@ -487,6 +527,15 @@ class CloudPickler(Pickler): for other types that suffered from type-specific reducers, such as Exceptions. See https://github.com/cloudpipe/cloudpickle/issues/248 """ + + # This is a patch for python3.5 + if isinstance(obj, numpy.ndarray): + if (self.proto < 5 or + (not obj.flags.c_contiguous and not obj.flags.f_contiguous) or + obj.dtype == 'O' or obj.itemsize == 0): + return NotImplemented + return _numpy_ndarray_reduce(obj) + t = type(obj) try: is_anyclass = issubclass(t, type) diff --git a/python/ray/serialization.py b/python/ray/serialization.py index b32453d8b..37f4e9589 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -1,9 +1,7 @@ import hashlib -import io import logging import time -import pyarrow import pyarrow.plasma as plasma import ray.cloudpickle as pickle @@ -15,7 +13,6 @@ from ray.exceptions import ( RayActorError, RayWorkerError, UnreconstructableError, - RAY_EXCEPTION_TYPES, ) from ray._raylet import Pickle5Writer, unpack_pickle5_buffers @@ -65,16 +62,6 @@ class Pickle5SerializedObject(SerializedObject): return self._total_bytes -class ArrowSerializedObject(SerializedObject): - def __init__(self, serialized_object): - super(ArrowSerializedObject, self).__init__(b"") - self.serialized_object = serialized_object - - @property - def total_bytes(self): - return self.serialized_object.total_bytes - - class RawSerializedObject(SerializedObject): def __init__(self, value): super(RawSerializedObject, @@ -137,6 +124,7 @@ class SerializationContext: def __init__(self, worker): self.worker = worker + assert worker.use_pickle self.use_pickle = worker.use_pickle def actor_handle_serializer(obj): @@ -147,175 +135,62 @@ class SerializationContext: new_handle._deserialization_helper(serialized_obj, True) return new_handle - if not worker.use_pickle: - serialization_context = pyarrow.default_serialization_context() - # Tell the serialization context to use the cloudpickle version - # that we ship with Ray. - serialization_context.set_pickle(pickle.dumps, pickle.loads) - pyarrow.register_torch_serialization_handlers( - serialization_context) + self._register_cloudpickle_serializer( + ray.actor.ActorHandle, + custom_serializer=actor_handle_serializer, + custom_deserializer=actor_handle_deserializer) - def id_serializer(obj): - return pickle.dumps(obj) + def id_serializer(obj): + return obj.__reduce__() - def id_deserializer(serialized_obj): - return pickle.loads(serialized_obj) + def id_deserializer(serialized_obj): + return serialized_obj[0](*serialized_obj[1]) - def object_id_serializer(obj): - owner_id = "" - owner_address = "" - if obj.is_direct_call_type(): - worker = ray.worker.get_global_worker() - worker.check_connected() - obj, owner_id, owner_address = ( - worker.core_worker.serialize_and_promote_object_id(obj) - ) - obj = obj.__reduce__() - owner_id = owner_id.__reduce__() if owner_id else owner_id - return pickle.dumps((obj, owner_id, owner_address)) + def object_id_serializer(obj): + owner_id = "" + owner_address = "" + if obj.is_direct_call_type(): + worker = ray.worker.get_global_worker() + worker.check_connected() + obj, owner_id, owner_address = ( + worker.core_worker.serialize_and_promote_object_id(obj)) + obj = id_serializer(obj) + owner_id = id_serializer(owner_id) if owner_id else owner_id + return (obj, owner_id, owner_address) - def object_id_deserializer(serialized_obj): - obj_id, owner_id, owner_address = pickle.loads(serialized_obj) - # NOTE(swang): Must deserialize the object first before asking - # the core worker to resolve the value. This is to make sure - # that the ref count for the ObjectID is greater than 0 by the - # time the core worker resolves the value of the object. - deserialized_object_id = obj_id[0](obj_id[1][0]) - if owner_id: - worker = ray.worker.get_global_worker() - worker.check_connected() - # UniqueIDs are serialized as - # (class name, (unique bytes,)). - worker.core_worker.deserialize_and_register_object_id( - obj_id[1][0], owner_id[1][0], owner_address) - return deserialized_object_id + def object_id_deserializer(serialized_obj): + obj_id, owner_id, owner_address = serialized_obj + # NOTE(swang): Must deserialize the object first before asking + # the core worker to resolve the value. This is to make sure + # that the ref count for the ObjectID is greater than 0 by the + # time the core worker resolves the value of the object. + deserialized_object_id = id_deserializer(obj_id) + if owner_id: + worker = ray.worker.get_global_worker() + worker.check_connected() + # UniqueIDs are serialized as + # (class name, (unique bytes,)). + worker.core_worker.deserialize_and_register_object_id( + obj_id[1][0], owner_id[1][0], owner_address) + return deserialized_object_id - for id_type in ray._raylet._ID_TYPES: - if id_type == ray._raylet.ObjectID: - serialization_context.register_type( - id_type, - "{}.{}".format(id_type.__module__, id_type.__name__), - custom_serializer=object_id_serializer, - custom_deserializer=object_id_deserializer) - else: - serialization_context.register_type( - id_type, - "{}.{}".format(id_type.__module__, id_type.__name__), - custom_serializer=id_serializer, - custom_deserializer=id_deserializer) - - # We register this serializer on each worker instead of calling - # _register_custom_serializer from the driver so that isinstance - # still works. - serialization_context.register_type( - ray.actor.ActorHandle, - "ray.ActorHandle", - pickle=False, - custom_serializer=actor_handle_serializer, - custom_deserializer=actor_handle_deserializer) - self.pyarrow_context = serialization_context - else: - self._register_cloudpickle_serializer( - ray.actor.ActorHandle, - custom_serializer=actor_handle_serializer, - custom_deserializer=actor_handle_deserializer) - - def id_serializer(obj): - return obj.__reduce__() - - def id_deserializer(serialized_obj): - return serialized_obj[0](*serialized_obj[1]) - - def object_id_serializer(obj): - owner_id = "" - owner_address = "" - if obj.is_direct_call_type(): - worker = ray.worker.get_global_worker() - worker.check_connected() - obj, owner_id, owner_address = ( - worker.core_worker.serialize_and_promote_object_id(obj) - ) - obj = id_serializer(obj) - owner_id = id_serializer(owner_id) if owner_id else owner_id - return (obj, owner_id, owner_address) - - def object_id_deserializer(serialized_obj): - obj_id, owner_id, owner_address = serialized_obj - # NOTE(swang): Must deserialize the object first before asking - # the core worker to resolve the value. This is to make sure - # that the ref count for the ObjectID is greater than 0 by the - # time the core worker resolves the value of the object. - deserialized_object_id = id_deserializer(obj_id) - if owner_id: - worker = ray.worker.get_global_worker() - worker.check_connected() - # UniqueIDs are serialized as - # (class name, (unique bytes,)). - worker.core_worker.deserialize_and_register_object_id( - obj_id[1][0], owner_id[1][0], owner_address) - return deserialized_object_id - - for id_type in ray._raylet._ID_TYPES: - if id_type == ray._raylet.ObjectID: - self._register_cloudpickle_serializer( - id_type, object_id_serializer, object_id_deserializer) - else: - self._register_cloudpickle_serializer( - id_type, id_serializer, id_deserializer) - - def initialize(self): - """ Register custom serializers """ - if not self.worker.use_pickle: - for error_cls in RAY_EXCEPTION_TYPES: - self.register_custom_serializer( - error_cls, - use_dict=True, - local=True, - class_id=error_cls.__module__ + ". " + error_cls.__name__, - ) - # Tell Ray to serialize lambdas with pickle. - self.register_custom_serializer( - type(lambda: 0), - use_pickle=True, - local=True, - class_id="lambda") - # Tell Ray to serialize types with pickle. - self.register_custom_serializer( - type(int), use_pickle=True, local=True, class_id="type") - # Tell Ray to serialize RayParameters as dictionaries. This is - # used when passing around actor handles. - self.register_custom_serializer( - ray.signature.RayParameter, - use_dict=True, - local=True, - class_id="ray.signature.RayParameter") - # Tell Ray to serialize StringIO with pickle. We do this because - # Ray's default __dict__ serialization is incorrect for this type - # (the object's __dict__ is empty and therefore doesn't - # contain the full state of the object). - self.register_custom_serializer( - io.StringIO, - use_pickle=True, - local=True, - class_id="io.StringIO") + for id_type in ray._raylet._ID_TYPES: + if id_type == ray._raylet.ObjectID: + self._register_cloudpickle_serializer( + id_type, object_id_serializer, object_id_deserializer) + else: + self._register_cloudpickle_serializer(id_type, id_serializer, + id_deserializer) def _register_cloudpickle_serializer(self, cls, custom_serializer, custom_deserializer): - if pickle.FAST_CLOUDPICKLE_USED: + assert pickle.FAST_CLOUDPICKLE_USED - def _CloudPicklerReducer(obj): - return custom_deserializer, (custom_serializer(obj), ) + def _CloudPicklerReducer(obj): + return custom_deserializer, (custom_serializer(obj), ) - # construct a reducer - pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer - else: - - def _CloudPicklerReducer(_self, obj): - _self.save_reduce( - custom_deserializer, (custom_serializer(obj), ), obj=obj) - - # use a placeholder for 'self' argument - pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer + # construct a reducer + pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer def _deserialize_object(self, data, metadata, object_id): if metadata: @@ -352,15 +227,7 @@ class SerializationContext: "Tried to get object that has been promoted to plasma." assert False, "Unrecognized error type " + str(error_type) elif data: - if self.use_pickle: - raise ValueError("Receiving plasma serialized objects " - "while the serialization context is " - "using pickle5 as the backend.") - try: - # If data is not empty, deserialize the object. - return pyarrow.deserialize(data, self.pyarrow_context) - except pyarrow.DeserializationCallbackError: - raise DeserializationError() + raise ValueError("non-null object should always have metadata") else: # Object isn't available in plasma. This should never be returned # to the user. We should only reach this line if this object was @@ -368,64 +235,10 @@ class SerializationContext: # throws an exception. return plasma.ObjectNotAvailable - def _store_and_register_pyarrow(self, value, depth=100): - """Store an object and attempt to register its class if needed. - - Args: - value: The value to put in the object store. - depth: The maximum number of classes to recursively register. - - Raises: - Exception: An exception is raised if the attempt to serialize the - object fails. - """ - counter = 0 - while True: - if counter == depth: - raise Exception("Ray exceeded the maximum number of classes " - "that it will recursively serialize when " - "attempting to serialize an object of " - "type {}.".format(type(value))) - counter += 1 - try: - return pyarrow.serialize(value, self.pyarrow_context) - except pyarrow.SerializationCallbackError as e: - cls_type = type(e.example_object) - try: - self.register_custom_serializer(cls_type, use_dict=True) - warning_message = ( - "WARNING: Serializing objects of type " - "{} by expanding them as dictionaries " - "of their fields. This behavior may " - "be incorrect in some cases.".format(cls_type)) - logger.debug(warning_message) - except (RayNotDictionarySerializable, CloudPickleError, - pickle.pickle.PicklingError, Exception): - # We also handle generic exceptions here because - # cloudpickle can fail with many different types of errors. - warning_message = ( - "Falling back to serializing {} objects by using " - "pickle. Use `ray.register_custom_serializer({},...)` " - "to provide faster serialization.".format( - cls_type, cls_type)) - try: - self.register_custom_serializer( - cls_type, use_pickle=True) - logger.warning(warning_message) - except (CloudPickleError, ValueError): - self.register_custom_serializer( - cls_type, use_pickle=True, local=True) - warning_message = ("WARNING: Pickling the class {} " - "failed, so we are using pickle " - "and only registering the class " - "locally.".format(cls_type)) - logger.warning(warning_message) - def deserialize_objects(self, data_metadata_pairs, object_ids, error_timeout=10): - pass assert len(data_metadata_pairs) == len(object_ids) start_time = time.time() @@ -473,29 +286,12 @@ class SerializationContext: # that this object can also be read by Java. return RawSerializedObject(value) - if self.worker.use_pickle: - writer = Pickle5Writer() - if ray.cloudpickle.FAST_CLOUDPICKLE_USED: - inband = pickle.dumps( - value, protocol=5, buffer_callback=writer.buffer_callback) - else: - inband = pickle.dumps(value) - return Pickle5SerializedObject(inband, writer) - else: - try: - serialized_value = self._store_and_register_pyarrow(value) - except TypeError: - # TypeError can happen because one of the members of the object - # may not be serializable for cloudpickle. So we need - # these extra fallbacks here to start from the beginning. - # Hopefully the object could have a `__reduce__` method. - self.register_custom_serializer(type(value), use_pickle=True) - logger.warning("WARNING: Serializing the class {} failed, " - "falling back to cloudpickle.".format( - type(value))) - serialized_value = self._store_and_register_pyarrow(value) - - return ArrowSerializedObject(serialized_value) + assert self.worker.use_pickle + assert ray.cloudpickle.FAST_CLOUDPICKLE_USED + writer = Pickle5Writer() + inband = pickle.dumps( + value, protocol=5, buffer_callback=writer.buffer_callback) + return Pickle5SerializedObject(inband, writer) def register_custom_serializer(self, cls, @@ -583,22 +379,10 @@ class SerializationContext: assert isinstance(job_id, JobID) def register_class_for_serialization(worker_info): + assert worker_info["worker"].use_pickle context = worker_info["worker"].get_serialization_context(job_id) - if worker_info["worker"].use_pickle: - context._register_cloudpickle_serializer( - cls, serializer, deserializer) - else: - # TODO(rkn): We need to be more thoughtful about what to do if - # custom serializers have already been registered for - # class_id. In some cases, we may want to use the last - # user-defined serializers and ignore subsequent calls to - # register_custom_serializer that were made by the system. - context.pyarrow_context.register_type( - cls, - class_id, - pickle=use_pickle, - custom_serializer=serializer, - custom_deserializer=deserializer) + context._register_cloudpickle_serializer(cls, serializer, + deserializer) if not local: self.worker.run_function_on_all_workers( diff --git a/python/ray/worker.py b/python/ray/worker.py index 03f1023b8..0dc1b6294 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -212,7 +212,6 @@ class Worker: if job_id not in self.serialization_context_map: self.serialization_context_map[ job_id] = serialization.SerializationContext(self) - self.serialization_context_map[job_id].initialize() return self.serialization_context_map[job_id] def check_connected(self):