diff --git a/build.sh b/build.sh index c55a4763e..2a061bb3d 100755 --- a/build.sh +++ b/build.sh @@ -102,6 +102,19 @@ if [ -z "$SKIP_PYARROW_INSTALL" ]; then --find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/3a11193d9530fe8ec7fdb98057f853b708f6f6ae/index.html fi +PYTHON_VERSION=`"$PYTHON_EXECUTABLE" -c 'import sys; version=sys.version_info[:3]; print("{0}.{1}".format(*version))'` +if [[ "$PYTHON_VERSION" == "3.6" || "$PYTHON_VERSION" == "3.7" ]]; then + WORK_DIR=`mktemp -d` + pushd $WORK_DIR + git clone https://github.com/pitrou/pickle5-backport + pushd pickle5-backport + git checkout 5186f9ca4ce55ae530027db196da51e08208a16b + "$PYTHON_EXECUTABLE" setup.py bdist_wheel + unzip -o dist/*.whl -d "$ROOT_DIR/python/ray/pickle5_files" + popd + popd +fi + export PYTHON3_BIN_PATH="$PYTHON_EXECUTABLE" export PYTHON2_BIN_PATH="$PYTHON_EXECUTABLE" diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 4d9d9e7ef..a180a67a6 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -5,12 +5,26 @@ from __future__ import print_function import os import sys +# MUST add pickle5 to the import path because it will be imported by some +# raylet modules. + +if "pickle5" in sys.modules: + raise ImportError("Ray must be imported before pickle5 because Ray " + "requires a specific version of pickle5 (which is " + "packaged along with Ray).") + +# Add the directory containing pickle5 to the Python path so that we find the +# pickle5 version packaged with ray and not a pre-existing pickle5. +pickle5_path = os.path.join( + os.path.abspath(os.path.dirname(__file__)), "pickle5_files") +sys.path.insert(0, pickle5_path) + # MUST import ray._raylet before pyarrow to initialize some global variables. # It seems the library related to memory allocation in pyarrow will destroy the # initialization of grpc if we import pyarrow at first. # NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more # details. -import ray._raylet +import ray._raylet # noqa: E402 if "pyarrow" in sys.modules: raise ImportError("Ray must be imported before pyarrow because Ray " diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3aa7b0613..54df7e6b3 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -654,6 +654,34 @@ cdef void exit_handler() nogil: sys.exit(0) +cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str): + cdef shared_ptr[CBuffer] empty_metadata + if c_str.size() == 0: + return empty_metadata + return dynamic_pointer_cast[CBuffer, LocalMemoryBuffer]( + make_shared[LocalMemoryBuffer]((c_str.data()), + c_str.size(), True)) + + +cdef write_serialized_object(serialized_object, const shared_ptr[CBuffer]& buf): + # avoid initializing pyarrow before raylet + from ray.serialization import Pickle5SerializedObject, RawSerializedObject + + if isinstance(serialized_object, RawSerializedObject): + buffer = Buffer.make(buf) + stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer)) + stream.set_memcopy_threads(MEMCOPY_THREADS) + stream.write(pyarrow.py_buffer(serialized_object.value)) + elif isinstance(serialized_object, Pickle5SerializedObject): + (serialized_object.writer).write_to( + serialized_object.inband, buf, MEMCOPY_THREADS) + else: + buffer = Buffer.make(buf) + stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer)) + stream.set_memcopy_threads(MEMCOPY_THREADS) + serialized_object.serialized_object.write_to(stream) + + cdef class CoreWorker: cdef unique_ptr[CCoreWorker] core_worker @@ -753,66 +781,15 @@ cdef class CoreWorker: CObjectID c_object_id shared_ptr[CBuffer] data shared_ptr[CBuffer] metadata - + metadata = string_to_buffer(serialized_object.metadata) + total_bytes = serialized_object.total_bytes object_already_exists = self._create_put_buffer( - metadata, serialized_object.total_bytes, - object_id, &c_object_id, &data) + metadata, total_bytes, object_id, &c_object_id, &data) if not object_already_exists: - stream = pyarrow.FixedSizeBufferWriter( - pyarrow.py_buffer(Buffer.make(data))) - stream.set_memcopy_threads(MEMCOPY_THREADS) - serialized_object.write_to(stream) - + write_serialized_object(serialized_object, data) with nogil: check_status( self.core_worker.get().Seal(c_object_id)) - - return ObjectID(c_object_id.Binary()) - - def put_raw_buffer(self, c_string value, ObjectID object_id=None): - cdef: - c_string metadata_str = RAW_BUFFER_METADATA - CObjectID c_object_id - shared_ptr[CBuffer] data - shared_ptr[CBuffer] metadata = dynamic_pointer_cast[ - CBuffer, LocalMemoryBuffer]( - make_shared[LocalMemoryBuffer]( - (metadata_str.data()), metadata_str.size())) - - object_already_exists = self._create_put_buffer( - metadata, value.size(), object_id, &c_object_id, &data) - if not object_already_exists: - stream = pyarrow.FixedSizeBufferWriter( - pyarrow.py_buffer(Buffer.make(data))) - stream.set_memcopy_threads(MEMCOPY_THREADS) - stream.write(pyarrow.py_buffer(value)) - - with nogil: - check_status( - self.core_worker.get().Seal(c_object_id)) - - return ObjectID(c_object_id.Binary()) - - def put_pickle5_buffers(self, c_string inband, - Pickle5Writer writer, ObjectID object_id=None): - cdef: - CObjectID c_object_id - c_string metadata_str = PICKLE5_BUFFER_METADATA - shared_ptr[CBuffer] data - shared_ptr[CBuffer] metadata = dynamic_pointer_cast[ - CBuffer, LocalMemoryBuffer]( - make_shared[LocalMemoryBuffer]( - (metadata_str.data()), metadata_str.size())) - - object_already_exists = self._create_put_buffer( - metadata, writer.get_total_bytes(inband), - object_id, &c_object_id, &data) - if not object_already_exists: - writer.write_to(inband, data, MEMCOPY_THREADS) - with nogil: - check_status( - self.core_worker.get().Seal(c_object_id)) - return ObjectID(c_object_id.Binary()) def wait(self, object_ids, int num_returns, int64_t timeout_ms, @@ -1021,7 +998,6 @@ cdef class CoreWorker: cdef: c_vector[size_t] data_sizes c_string metadata_str - shared_ptr[CBuffer] empty_metadata c_vector[shared_ptr[CBuffer]] metadatas if return_ids.size() == 0: @@ -1036,31 +1012,13 @@ cdef class CoreWorker: elif output is NoReturn: serialized_objects.append(output) data_sizes.push_back(0) - metadatas.push_back(empty_metadata) - elif isinstance(output, bytes): - serialized_objects.append(output) - data_sizes.push_back(len(output)) - metadata_str = RAW_BUFFER_METADATA - metadatas.push_back(dynamic_pointer_cast[ - CBuffer, LocalMemoryBuffer]( - make_shared[LocalMemoryBuffer]( - (metadata_str.data()), - metadata_str.size(), True))) - elif worker.use_pickle: - inband, writer = worker._serialize_with_pickle5(output) - serialized_objects.append((inband, writer)) - data_sizes.push_back(writer.get_total_bytes(inband)) - metadata_str = PICKLE5_BUFFER_METADATA - metadatas.push_back(dynamic_pointer_cast[ - CBuffer, LocalMemoryBuffer]( - make_shared[LocalMemoryBuffer]( - (metadata_str.data()), - metadata_str.size(), True))) + metadatas.push_back(string_to_buffer(b'')) else: - serialized_object = worker._serialize_with_pyarrow(output) - serialized_objects.append(serialized_object) + context = worker.get_serialization_context() + serialized_object = context.serialize(output) data_sizes.push_back(serialized_object.total_bytes) - metadatas.push_back(empty_metadata) + metadatas.push_back(string_to_buffer(serialized_object.metadata)) + serialized_objects.append(serialized_object) check_status(self.core_worker.get().AllocateReturnObjects( return_ids, data_sizes, metadatas, returns)) @@ -1069,22 +1027,7 @@ cdef class CoreWorker: # A nullptr is returned if the object already exists. if returns[0][i].get() == NULL: continue - if serialized_object is NoReturn: returns[0][i].reset() - elif isinstance(serialized_object, bytes): - buffer = Buffer.make(returns[0][i].get().GetData()) - stream = pyarrow.FixedSizeBufferWriter( - pyarrow.py_buffer(buffer)) - stream.set_memcopy_threads(MEMCOPY_THREADS) - stream.write(pyarrow.py_buffer(serialized_object)) - elif worker.use_pickle: - inband, writer = serialized_object - (writer).write_to( - inband, returns[0][i].get().GetData(), MEMCOPY_THREADS) else: - buffer = Buffer.make(returns[0][i].get().GetData()) - stream = pyarrow.FixedSizeBufferWriter( - pyarrow.py_buffer(buffer)) - stream.set_memcopy_threads(MEMCOPY_THREADS) - serialized_object.write_to(stream) + write_serialized_object(serialized_object, returns[0][i].get().GetData()) diff --git a/python/ray/cloudpickle/__init__.py b/python/ray/cloudpickle/__init__.py index a2f166ac5..4389bd24c 100644 --- a/python/ray/cloudpickle/__init__.py +++ b/python/ray/cloudpickle/__init__.py @@ -1,7 +1,15 @@ from __future__ import absolute_import +import os import sys -if sys.version_info[:2] >= (3, 8): +CLOUDPICKLE_PATH = os.path.dirname(os.path.realpath(__file__)) + +if os.path.exists(os.path.join(CLOUDPICKLE_PATH, "..", "pickle5_files", "pickle5")): + HAS_PICKLE5 = True +else: + HAS_PICKLE5 = False + +if sys.version_info[:2] >= (3, 8) or HAS_PICKLE5: from ray.cloudpickle.cloudpickle_fast import * FAST_CLOUDPICKLE_USED = True else: diff --git a/python/ray/cloudpickle/cloudpickle_fast.py b/python/ray/cloudpickle/cloudpickle_fast.py index 691bc77cc..c26f7610d 100644 --- a/python/ray/cloudpickle/cloudpickle_fast.py +++ b/python/ray/cloudpickle/cloudpickle_fast.py @@ -15,22 +15,28 @@ import copyreg import io import itertools import logging -import _pickle -import pickle + import sys import types import weakref -from _pickle import Pickler - from .cloudpickle import ( _is_dynamic, _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, _find_imported_submodules, _get_cell_contents, _is_global, _builtin_type, Enum, _ensure_tracking, _make_skeleton_class, _make_skeleton_enum, - _extract_class_dict, string_types, dynamic_subimport, subimport + _extract_class_dict, string_types, dynamic_subimport, subimport, cell_set, + _make_empty_cell ) -load, loads = _pickle.load, _pickle.loads +if sys.version_info[:2] < (3, 8): + import pickle5 as pickle + from pickle5 import Pickler + load, loads = pickle.load, pickle.loads +else: + import _pickle + import pickle + from _pickle import Pickler + load, loads = _pickle.load, _pickle.loads # Shorthands similar to pickle.dump/pickle.dumps @@ -108,7 +114,6 @@ def _function_getstate(func): "__defaults__": func.__defaults__, "__module__": func.__module__, "__doc__": func.__doc__, - "__closure__": func.__closure__, } f_globals_ref = _extract_code_globals(func.__code__) @@ -187,25 +192,46 @@ def _enum_getstate(obj): def _code_reduce(obj): """codeobject reducer""" - args = ( - obj.co_argcount, obj.co_posonlyargcount, - obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, - obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, - obj.co_varnames, obj.co_filename, obj.co_name, - obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, - obj.co_cellvars - ) + if hasattr(obj, "co_posonlyargcount"): # pragma: no branch + args = ( + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, + obj.co_cellvars + ) + else: + args = ( + obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, + obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, + obj.co_names, obj.co_varnames, obj.co_filename, + obj.co_name, obj.co_firstlineno, obj.co_lnotab, + obj.co_freevars, obj.co_cellvars + ) return types.CodeType, args +def _make_cell(contents): + cell = _make_empty_cell() + cell_set(cell, contents) + return cell + + def _cell_reduce(obj): """Cell (containing values of a function's free variables) reducer""" try: - obj.cell_contents + contents = (obj.cell_contents,) except ValueError: # cell is empty - return types.CellType, () + contents = () + + if sys.version_info[:2] < (3, 8): + if contents: + return _make_cell, contents + else: + return _make_empty_cell, () else: - return types.CellType, (obj.cell_contents,) + return types.CellType, contents def _classmethod_reduce(obj): @@ -347,7 +373,6 @@ def _function_setstate(obj, state): obj.__dict__.update(state) obj_globals = slotstate.pop("__globals__") - obj_closure = slotstate.pop("__closure__") # _cloudpickle_subimports is a set of submodules that must be loaded for # the pickled function to work correctly at unpickling time. Now that these # submodules are depickled (hence imported), they can be removed from the @@ -358,14 +383,6 @@ def _function_setstate(obj, state): obj.__globals__.update(obj_globals) obj.__globals__["__builtins__"] = __builtins__ - if obj_closure is not None: - for i, cell in enumerate(obj_closure): - try: - value = cell.cell_contents - except ValueError: # cell is empty - continue - obj.__closure__[i].cell_contents = value - for k, v in slotstate.items(): setattr(obj, k, v) @@ -385,6 +402,11 @@ def _class_setstate(obj, state): return obj +def _property_reduce(obj): + # Python < 3.8 only + return property, (obj.fget, obj.fset, obj.fdel, obj.__doc__) + + class CloudPickler(Pickler): """Fast C Pickler extension with additional reducing routines. @@ -407,13 +429,18 @@ class CloudPickler(Pickler): dispatch[logging.RootLogger] = _root_logger_reduce dispatch[memoryview] = _memoryview_reduce dispatch[staticmethod] = _classmethod_reduce - dispatch[types.CellType] = _cell_reduce dispatch[types.CodeType] = _code_reduce dispatch[types.GetSetDescriptorType] = _getset_descriptor_reduce dispatch[types.ModuleType] = _module_reduce dispatch[types.MethodType] = _method_reduce dispatch[types.MappingProxyType] = _mappingproxy_reduce dispatch[weakref.WeakSet] = _weakset_reduce + if sys.version_info[:2] >= (3, 8): + dispatch[types.CellType] = _cell_reduce + else: + dispatch[type(_make_empty_cell())] = _cell_reduce + if sys.version_info[:2] < (3, 8): + dispatch[property] = _property_reduce def __init__(self, file, protocol=None, buffer_callback=None): if protocol is None: @@ -523,15 +550,7 @@ class CloudPickler(Pickler): if k in func.__globals__: base_globals[k] = func.__globals__[k] - # Do not bind the free variables before the function is created to - # avoid infinite recursion. - if func.__closure__ is None: - closure = None - else: - closure = tuple( - types.CellType() for _ in range(len(code.co_freevars))) - - return code, base_globals, None, None, closure + return code, base_globals, None, None, func.__closure__ def dump(self, obj): try: diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index b45cdd6aa..0cd439d48 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -233,9 +233,9 @@ def cli(logging_level, logging_format): default=False, help="Specify whether load code from local file or GCS serialization.") @click.option( - "--use-pickle", + "--use-pickle/--no-use-pickle", is_flag=True, - default=False, + default=ray.cloudpickle.FAST_CLOUDPICKLE_USED, help="Use pickle for serialization.") def start(node_ip_address, redis_address, address, redis_port, num_redis_shards, redis_max_clients, redis_password, diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 0998888e8..fa923c1d0 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -1,6 +1,26 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import hashlib +import io +import logging +import time +import pyarrow +import pyarrow.plasma as plasma +import ray.cloudpickle as pickle +from ray import ray_constants, JobID +import ray.utils +from ray.utils import _random_string +from ray.gcs_utils import ErrorType +from ray.exceptions import ( + RayActorError, + RayWorkerError, + UnreconstructableError, + RAY_EXCEPTION_TYPES, +) +from ray._raylet import Pickle5Writer, unpack_pickle5_buffers + +logger = logging.getLogger(__name__) class RayNotDictionarySerializable(Exception): @@ -13,6 +33,522 @@ class CloudPickleError(Exception): pass +class DeserializationError(Exception): + pass + + +class SerializedObject(object): + def __init__(self, metadata): + self._metadata = metadata + + @property + def total_bytes(self): + raise NotImplementedError + + @property + def metadata(self): + return self._metadata + + +class Pickle5SerializedObject(SerializedObject): + def __init__(self, inband, writer): + super(Pickle5SerializedObject, + self).__init__(ray_constants.PICKLE5_BUFFER_METADATA) + self.inband = inband + self.writer = writer + # cached total bytes + self._total_bytes = None + + @property + def total_bytes(self): + if self._total_bytes is None: + self._total_bytes = self.writer.get_total_bytes(self.inband) + return self._total_bytes + + +class ArrowSerializedObject(SerializedObject): + def __init__(self, serialized_object): + super(ArrowSerializedObject, self).__init__(b"") + self.serialized_object = serialized_object + + @property + def total_bytes(self): + return self.serialized_object.total_bytes + + +class RawSerializedObject(SerializedObject): + def __init__(self, value): + super(RawSerializedObject, + self).__init__(ray_constants.RAW_BUFFER_METADATA) + self.value = value + + @property + def total_bytes(self): + return len(self.value) + + +def _try_to_compute_deterministic_class_id(cls, depth=5): + """Attempt to produce a deterministic class ID for a given class. + + The goal here is for the class ID to be the same when this is run on + different worker processes. Pickling, loading, and pickling again seems to + produce more consistent results than simply pickling. This is a bit crazy + and could cause problems, in which case we should revert it and figure out + something better. + + Args: + cls: The class to produce an ID for. + depth: The number of times to repeatedly try to load and dump the + string while trying to reach a fixed point. + + Returns: + A class ID for this class. We attempt to make the class ID the same + when this function is run on different workers, but that is not + guaranteed. + + Raises: + Exception: This could raise an exception if cloudpickle raises an + exception. + """ + # Pickling, loading, and pickling again seems to produce more consistent + # results than simply pickling. This is a bit + class_id = pickle.dumps(cls) + for _ in range(depth): + new_class_id = pickle.dumps(pickle.loads(class_id)) + if new_class_id == class_id: + # We appear to have reached a fix point, so use this as the ID. + return hashlib.sha1(new_class_id).digest() + class_id = new_class_id + + # We have not reached a fixed point, so we may end up with a different + # class ID for this custom class on each worker, which could lead to the + # same class definition being exported many many times. + logger.warning( + "WARNING: Could not produce a deterministic class ID for class " + "{}".format(cls)) + return hashlib.sha1(new_class_id).digest() + + +class SerializationContext(object): + """Initialize the serialization library. + + This defines a custom serializer for object IDs and also tells ray to + serialize several exception classes that we define for error handling. + """ + + def __init__(self, worker): + self.worker = worker + self.use_pickle = worker.use_pickle + + def actor_handle_serializer(obj): + return obj._serialization_helper(True) + + def actor_handle_deserializer(serialized_obj): + new_handle = ray.actor.ActorHandle.__new__(ray.actor.ActorHandle) + new_handle._deserialization_helper(serialized_obj, True) + return new_handle + + if not worker.use_pickle: + serialization_context = pyarrow.default_serialization_context() + # Tell the serialization context to use the cloudpickle version + # that we ship with Ray. + serialization_context.set_pickle(pickle.dumps, pickle.loads) + pyarrow.register_torch_serialization_handlers( + serialization_context) + + def id_serializer(obj): + if isinstance(obj, + ray.ObjectID) and obj.is_direct_actor_type(): + raise NotImplementedError( + "Objects produced by direct actor calls cannot be " + "passed to other tasks as arguments.") + return pickle.dumps(obj) + + def id_deserializer(serialized_obj): + return pickle.loads(serialized_obj) + + for id_type in ray._raylet._ID_TYPES: + serialization_context.register_type( + id_type, + "{}.{}".format(id_type.__module__, id_type.__name__), + custom_serializer=id_serializer, + custom_deserializer=id_deserializer) + + # We register this serializer on each worker instead of calling + # _register_custom_serializer from the driver so that isinstance + # still works. + serialization_context.register_type( + ray.actor.ActorHandle, + "ray.ActorHandle", + pickle=False, + custom_serializer=actor_handle_serializer, + custom_deserializer=actor_handle_deserializer) + self.pyarrow_context = serialization_context + else: + self._register_cloudpickle_serializer( + ray.actor.ActorHandle, + custom_serializer=actor_handle_serializer, + custom_deserializer=actor_handle_deserializer) + + def id_serializer(obj): + if isinstance(obj, + ray.ObjectID) and obj.is_direct_actor_type(): + raise NotImplementedError( + "Objects produced by direct actor calls cannot be " + "passed to other tasks as arguments.") + return obj.__reduce__() + + def id_deserializer(serialized_obj): + return serialized_obj[0](*serialized_obj[1]) + + for id_type in ray._raylet._ID_TYPES: + self._register_cloudpickle_serializer(id_type, id_serializer, + id_deserializer) + + def initialize(self): + """ Register custom serializers """ + if not self.worker.use_pickle: + for error_cls in RAY_EXCEPTION_TYPES: + self.register_custom_serializer( + error_cls, + use_dict=True, + local=True, + class_id=error_cls.__module__ + ". " + error_cls.__name__, + ) + # Tell Ray to serialize lambdas with pickle. + self.register_custom_serializer( + type(lambda: 0), + use_pickle=True, + local=True, + class_id="lambda") + # Tell Ray to serialize types with pickle. + self.register_custom_serializer( + type(int), use_pickle=True, local=True, class_id="type") + # Tell Ray to serialize RayParameters as dictionaries. This is + # used when passing around actor handles. + self.register_custom_serializer( + ray.signature.RayParameter, + use_dict=True, + local=True, + class_id="ray.signature.RayParameter") + # Tell Ray to serialize StringIO with pickle. We do this because + # Ray's default __dict__ serialization is incorrect for this type + # (the object's __dict__ is empty and therefore doesn't + # contain the full state of the object). + self.register_custom_serializer( + io.StringIO, + use_pickle=True, + local=True, + class_id="io.StringIO") + + def _register_cloudpickle_serializer(self, cls, custom_serializer, + custom_deserializer): + if pickle.FAST_CLOUDPICKLE_USED: + + def _CloudPicklerReducer(obj): + return custom_deserializer, (custom_serializer(obj), ) + + # construct a reducer + pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer + else: + + def _CloudPicklerReducer(_self, obj): + _self.save_reduce( + custom_deserializer, (custom_serializer(obj), ), obj=obj) + + # use a placeholder for 'self' argument + pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer + + def _deserialize_object_from_arrow(self, data, metadata, object_id): + if metadata: + if metadata == ray_constants.PICKLE5_BUFFER_METADATA: + if not self.use_pickle: + raise ValueError("Receiving pickle5 serialized objects " + "while the serialization context is " + "using pyarrow as the backend.") + try: + in_band, buffers = unpack_pickle5_buffers(data) + if len(buffers) > 0: + return pickle.loads(in_band, buffers=buffers) + else: + return pickle.loads(in_band) + # cloudpickle does not provide error types + except pickle.pickle.PicklingError: + raise DeserializationError() + # Check if the object should be returned as raw bytes. + if metadata == ray_constants.RAW_BUFFER_METADATA: + if data is None: + return b"" + return data.to_pybytes() + # Otherwise, return an exception object based on + # the error type. + error_type = int(metadata) + if error_type == ErrorType.Value("WORKER_DIED"): + return RayWorkerError() + elif error_type == ErrorType.Value("ACTOR_DIED"): + return RayActorError() + elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): + return UnreconstructableError(ray.ObjectID(object_id.binary())) + else: + assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \ + "Tried to get object that has been promoted to plasma." + assert False, "Unrecognized error type " + str(error_type) + elif data: + if self.use_pickle: + raise ValueError("Receiving plasma serialized objects " + "while the serialization context is " + "using pickle5 as the backend.") + try: + # If data is not empty, deserialize the object. + return pyarrow.deserialize(data, self.pyarrow_context) + except pyarrow.DeserializationCallbackError: + raise DeserializationError() + else: + # Object isn't available in plasma. + return plasma.ObjectNotAvailable + + def _store_and_register_pyarrow(self, value, depth=100): + """Store an object and attempt to register its class if needed. + + Args: + value: The value to put in the object store. + depth: The maximum number of classes to recursively register. + + Raises: + Exception: An exception is raised if the attempt to serialize the + object fails. + """ + counter = 0 + while True: + if counter == depth: + raise Exception("Ray exceeded the maximum number of classes " + "that it will recursively serialize when " + "attempting to serialize an object of " + "type {}.".format(type(value))) + counter += 1 + try: + return pyarrow.serialize(value, self.pyarrow_context) + except pyarrow.SerializationCallbackError as e: + cls_type = type(e.example_object) + try: + self.register_custom_serializer(cls_type, use_dict=True) + warning_message = ( + "WARNING: Serializing objects of type " + "{} by expanding them as dictionaries " + "of their fields. This behavior may " + "be incorrect in some cases.".format(cls_type)) + logger.debug(warning_message) + except (RayNotDictionarySerializable, CloudPickleError, + pickle.pickle.PicklingError, Exception): + # We also handle generic exceptions here because + # cloudpickle can fail with many different types of errors. + warning_message = ( + "Falling back to serializing {} objects by using " + "pickle. Use `ray.register_custom_serializer({},...)` " + "to provide faster serialization.".format( + cls_type, cls_type)) + try: + self.register_custom_serializer( + cls_type, use_pickle=True) + logger.warning(warning_message) + except (CloudPickleError, ValueError): + self.register_custom_serializer( + cls_type, use_pickle=True, local=True) + warning_message = ("WARNING: Pickling the class {} " + "failed, so we are using pickle " + "and only registering the class " + "locally.".format(cls_type)) + logger.warning(warning_message) + + def deserialize_objects(self, + data_metadata_pairs, + object_ids, + error_timeout=10): + pass + assert len(data_metadata_pairs) == len(object_ids) + + start_time = time.time() + results = [] + warning_sent = False + i = 0 + while i < len(object_ids): + object_id = object_ids[i] + data, metadata = data_metadata_pairs[i] + try: + results.append( + self._deserialize_object_from_arrow( + data, metadata, object_id)) + i += 1 + except DeserializationError: + # Wait a little bit for the import thread to import the class. + # If we currently have the worker lock, we need to release it + # so that the import thread can acquire it. + time.sleep(0.01) + + if time.time() - start_time > error_timeout: + warning_message = ("This worker or driver is waiting to " + "receive a class definition so that it " + "can deserialize an object from the " + "object store. This may be fine, or it " + "may be a bug.") + if not warning_sent: + ray.utils.push_error_to_driver( + self, + ray_constants.WAIT_FOR_CLASS_PUSH_ERROR, + warning_message, + job_id=self.worker.current_job_id) + warning_sent = True + + return results + + def serialize(self, value): + """Serialize an object. + + Args: + value: The value to serialize. + """ + if isinstance(value, bytes): + # If the object is a byte array, skip serializing it and + # use a special metadata to indicate it's raw binary. So + # that this object can also be read by Java. + return RawSerializedObject(value) + + if self.worker.use_pickle: + writer = Pickle5Writer() + if ray.cloudpickle.FAST_CLOUDPICKLE_USED: + inband = pickle.dumps( + value, protocol=5, buffer_callback=writer.buffer_callback) + else: + inband = pickle.dumps(value) + return Pickle5SerializedObject(inband, writer) + else: + try: + serialized_value = self._store_and_register_pyarrow(value) + except TypeError: + # TypeError can happen because one of the members of the object + # may not be serializable for cloudpickle. So we need + # these extra fallbacks here to start from the beginning. + # Hopefully the object could have a `__reduce__` method. + self.register_custom_serializer(type(value), use_pickle=True) + logger.warning("WARNING: Serializing the class {} failed, " + "falling back to cloudpickle.".format( + type(value))) + serialized_value = self._store_and_register_pyarrow(value) + + return ArrowSerializedObject(serialized_value) + + def register_custom_serializer(self, + cls, + use_pickle=False, + use_dict=False, + serializer=None, + deserializer=None, + local=False, + job_id=None, + class_id=None): + """Enable serialization and deserialization for a particular class. + + This method runs the register_class function defined below on + every worker, which will enable ray to properly serialize and + deserialize objects of this class. + + Args: + cls (type): The class that ray should use this custom serializer + for. + use_pickle (bool): If true, then objects of this class will be + serialized using pickle. + use_dict: If true, then objects of this class be serialized + turning their __dict__ fields into a dictionary. Must be False + if use_pickle is true. + serializer: The custom serializer to use. This should be provided + if and only if use_pickle and use_dict are False. + deserializer: The custom deserializer to use. This should be + provided if and only if use_pickle and use_dict are False. + local: True if the serializers should only be registered on the + current worker. This should usually be False. + job_id: ID of the job that we want to register the class for. + class_id (str): Unique ID of the class. Autogenerated if None. + + Raises: + RayNotDictionarySerializable: Raised if use_dict is true and cls + cannot be efficiently serialized by Ray. + ValueError: Raised if ray could not autogenerate a class_id. + """ + assert (serializer is None) == (deserializer is None), ( + "The serializer/deserializer arguments must both be provided or " + "both not be provided.") + use_custom_serializer = (serializer is not None) + + assert use_custom_serializer + use_pickle + use_dict == 1, ( + "Exactly one of use_pickle, use_dict, or serializer/deserializer " + "must be specified.") + + if self.worker.use_pickle and serializer is None: + # In this case it should do nothing. + return + + if use_dict: + # Raise an exception if cls cannot be serialized + # efficiently by Ray. + check_serializable(cls) + + if class_id is None: + if not local: + # In this case, the class ID will be used to deduplicate the + # class across workers. Note that cloudpickle unfortunately + # does not produce deterministic strings, so these IDs could + # be different on different workers. We could use something + # weaker like cls.__name__, however that would run the risk + # of having collisions. + # TODO(rkn): We should improve this. + try: + # Attempt to produce a class ID that will be the same on + # each worker. However, determinism is not guaranteed, + # and the result may be different on different workers. + class_id = _try_to_compute_deterministic_class_id(cls) + except Exception: + raise ValueError( + "Failed to use pickle in generating a unique id" + "for '{}'. Provide a unique class_id.".format(cls)) + else: + # In this case, the class ID only needs to be meaningful on + # this worker and not across workers. + class_id = _random_string() + + # Make sure class_id is a string. + class_id = ray.utils.binary_to_hex(class_id) + + if job_id is None: + job_id = self.worker.current_job_id + assert isinstance(job_id, JobID) + + def register_class_for_serialization(worker_info): + context = worker_info["worker"].get_serialization_context(job_id) + if worker_info["worker"].use_pickle: + context._register_cloudpickle_serializer( + cls, serializer, deserializer) + else: + # TODO(rkn): We need to be more thoughtful about what to do if + # custom serializers have already been registered for + # class_id. In some cases, we may want to use the last + # user-defined serializers and ignore subsequent calls to + # register_custom_serializer that were made by the system. + context.pyarrow_context.register_type( + cls, + class_id, + pickle=use_pickle, + custom_serializer=serializer, + custom_deserializer=deserializer) + + if not local: + self.worker.run_function_on_all_workers( + register_class_for_serialization) + else: + # Since we are pickling objects of this class, we don't actually + # need to ship the class definition. + register_class_for_serialization({"worker": self.worker}) + + def check_serializable(cls): """Throws an exception if Ray cannot serialize this class efficiently. @@ -41,9 +577,9 @@ def check_serializable(cls): obj = cls.__new__(cls) except Exception: raise RayNotDictionarySerializable("The class {} has overridden " - "'__new__', so Ray may not be able " - "to serialize it efficiently." - .format(cls)) + "'__new__', so Ray may not be " + "able to serialize it " + "efficiently.".format(cls)) if not hasattr(obj, "__dict__"): raise RayNotDictionarySerializable("Objects of the class {} do not " "have a '__dict__' attribute, so " diff --git a/python/ray/tests/cluster_utils.py b/python/ray/tests/cluster_utils.py index 80107f6d3..09f527575 100644 --- a/python/ray/tests/cluster_utils.py +++ b/python/ray/tests/cluster_utils.py @@ -81,6 +81,7 @@ class Cluster(object): "object_store_memory": 150 * 1024 * 1024, # 150 MiB } ray_params = ray.parameter.RayParams(**node_args) + ray_params.use_pickle = ray.cloudpickle.FAST_CLOUDPICKLE_USED ray_params.update_if_absent(**default_kwargs) if self.head_node is None: node = ray.node.Node( diff --git a/python/ray/worker.py b/python/ray/worker.py index af8a1d5e2..6bca9e082 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -8,7 +8,6 @@ import atexit import faulthandler import hashlib import inspect -import io import json import logging import os @@ -22,8 +21,6 @@ import traceback import random # Ray modules -import pyarrow -import pyarrow.plasma as plasma import ray.cloudpickle as pickle import ray.gcs_utils import ray.memory_monitor as memory_monitor @@ -43,17 +40,11 @@ from ray import ( ) from ray import import_thread from ray import profiling -from ray._raylet import Pickle5Writer, unpack_pickle5_buffers -from ray.gcs_utils import ErrorType from ray.exceptions import ( - RayActorError, RayError, RayTaskError, - RayWorkerError, ObjectStoreFullError, - UnreconstructableError, - RAY_EXCEPTION_TYPES, ) from ray.function_manager import FunctionActorManager from ray.utils import ( @@ -203,7 +194,7 @@ class Worker(object): if self.actor_init_error is not None: raise self.actor_init_error - def get_serialization_context(self, job_id): + def get_serialization_context(self, job_id=None): """Get the SerializationContext of the job that this worker is processing. Args: @@ -213,13 +204,17 @@ class Worker(object): Returns: The serialization context of the given job. """ - # This function needs to be proctected by a lock, because it will be + # This function needs to be protected by a lock, because it will be # called by`register_class_for_serialization`, as well as the import # thread, from different threads. Also, this function will recursively # call itself, so we use RLock here. + if job_id is None: + job_id = self.current_job_id with self.lock: if job_id not in self.serialization_context_map: - _initialize_serialization(job_id) + self.serialization_context_map[ + job_id] = serialization.SerializationContext(self) + self.serialization_context_map[job_id].initialize() return self.serialization_context_map[job_id] def check_connected(self): @@ -284,199 +279,17 @@ class Worker(object): "do this, you can wrap the ray.ObjectID in a list and " "call 'put' on it (or return it).") - if isinstance(value, bytes): - # If the object is a byte array, skip serializing it and - # use a special metadata to indicate it's raw binary. So - # that this object can also be read by Java. - return self.core_worker.put_raw_buffer(value, object_id=object_id) - - if self.use_pickle: - return self._serialize_and_put_pickle5(value, object_id=object_id) - else: - return self._serialize_and_put_pyarrow(value, object_id=object_id) - - def _serialize_and_put_pickle5(self, value, object_id=None): - """Serialize an object using pickle5 and store it in the object store. - - Args: - value: The value to put in the object store. - object_id: The ID of the object to store. If none, one will be - generated. - - Raises: - Exception: An exception is raised if the attempt to store the - object fails. This can happen if the object store is full. - """ - inband, writer = self._serialize_with_pickle5(value) - return self.core_worker.put_pickle5_buffers( - inband, writer, object_id=object_id) - - def _serialize_with_pickle5(self, value): - writer = Pickle5Writer() - if ray.cloudpickle.FAST_CLOUDPICKLE_USED: - inband = pickle.dumps( - value, protocol=5, buffer_callback=writer.buffer_callback) - else: - inband = pickle.dumps(value) - return inband, writer - - def _serialize_and_put_pyarrow(self, value, object_id=None): - """Wraps `store_and_register` with cases for existence and pickling. - - Args: - object_id (object_id.ObjectID): The object ID of the value to be - put. - value: The value to put in the object store. - """ - serialized_value = self._serialize_with_pyarrow(value) + serialized_value = self.get_serialization_context().serialize(value) return self.core_worker.put_serialized_object( serialized_value, object_id=object_id) - def _serialize_with_pyarrow(self, value): - try: - serialized_value = self._store_and_register_pyarrow(value) - except TypeError: - # TypeError can happen because one of the members of the object - # may not be serializable for cloudpickle. So we need - # these extra fallbacks here to start from the beginning. - # Hopefully the object could have a `__reduce__` method. - _register_custom_serializer(type(value), use_pickle=True) - logger.warning("WARNING: Serializing the class {} failed, " - "falling back to cloudpickle.".format(type(value))) - serialized_value = self._store_and_register_pyarrow(value) - - return serialized_value - - def _store_and_register_pyarrow(self, value, depth=100): - """Store an object and attempt to register its class if needed. - - Args: - value: The value to put in the object store. - depth: The maximum number of classes to recursively register. - - Raises: - Exception: An exception is raised if the attempt to serialize the - object fails. - """ - counter = 0 - while True: - if counter == depth: - raise Exception("Ray exceeded the maximum number of classes " - "that it will recursively serialize when " - "attempting to serialize an object of " - "type {}.".format(type(value))) - counter += 1 - try: - serialization_context = self.get_serialization_context( - self.current_job_id) - return pyarrow.serialize(value, serialization_context) - except pyarrow.SerializationCallbackError as e: - cls_type = type(e.example_object) - try: - _register_custom_serializer(cls_type, use_dict=True) - warning_message = ( - "WARNING: Serializing objects of type " - "{} by expanding them as dictionaries " - "of their fields. This behavior may " - "be incorrect in some cases.".format(cls_type)) - logger.debug(warning_message) - except (serialization.RayNotDictionarySerializable, - serialization.CloudPickleError, - pickle.pickle.PicklingError, Exception): - # We also handle generic exceptions here because - # cloudpickle can fail with many different types of errors. - warning_message = ( - "Falling back to serializing {} objects by using " - "pickle. Use `ray.register_custom_serializer({},...)` " - "to provide faster serialization.".format( - cls_type, cls_type)) - try: - _register_custom_serializer(cls_type, use_pickle=True) - logger.warning(warning_message) - except (serialization.CloudPickleError, ValueError): - _register_custom_serializer( - cls_type, use_pickle=True, local=True) - warning_message = ("WARNING: Pickling the class {} " - "failed, so we are using pickle " - "and only registering the class " - "locally.".format(cls_type)) - logger.warning(warning_message) - def deserialize_objects(self, data_metadata_pairs, object_ids, error_timeout=10): - assert len(data_metadata_pairs) == len(object_ids) - - start_time = time.time() - serialization_context = self.get_serialization_context( - self.current_job_id) - results = [] - warning_sent = False - i = 0 - while i < len(object_ids): - object_id = object_ids[i] - data, metadata = data_metadata_pairs[i] - try: - results.append( - self._deserialize_object_from_arrow( - data, metadata, object_id, serialization_context)) - i += 1 - except pyarrow.DeserializationCallbackError: - # Wait a little bit for the import thread to import the class. - # If we currently have the worker lock, we need to release it - # so that the import thread can acquire it. - time.sleep(0.01) - - if time.time() - start_time > error_timeout: - warning_message = ("This worker or driver is waiting to " - "receive a class definition so that it " - "can deserialize an object from the " - "object store. This may be fine, or it " - "may be a bug.") - if not warning_sent: - ray.utils.push_error_to_driver( - self, - ray_constants.WAIT_FOR_CLASS_PUSH_ERROR, - warning_message, - job_id=self.current_job_id) - warning_sent = True - - return results - - def _deserialize_object_from_arrow(self, data, metadata, object_id, - serialization_context): - if metadata: - if metadata == ray_constants.PICKLE5_BUFFER_METADATA: - in_band, buffers = unpack_pickle5_buffers(data) - if len(buffers) > 0: - return pickle.loads(in_band, buffers=buffers) - else: - return pickle.loads(in_band) - # Check if the object should be returned as raw bytes. - if metadata == ray_constants.RAW_BUFFER_METADATA: - if data is None: - return b"" - return data.to_pybytes() - # Otherwise, return an exception object based on - # the error type. - error_type = int(metadata) - if error_type == ErrorType.Value("WORKER_DIED"): - return RayWorkerError() - elif error_type == ErrorType.Value("ACTOR_DIED"): - return RayActorError() - elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): - return UnreconstructableError(ray.ObjectID(object_id.binary())) - else: - assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \ - "Tried to get object that has been promoted to plasma." - assert False, "Unrecognized error type " + str(error_type) - elif data: - # If data is not empty, deserialize the object. - return pyarrow.deserialize(data, serialization_context) - else: - # Object isn't available in plasma. - return plasma.ObjectNotAvailable + context = self.get_serialization_context() + return context.deserialize_objects(data_metadata_pairs, object_ids, + error_timeout) def get_objects(self, object_ids): """Get the values in the object store associated with the IDs. @@ -712,98 +525,6 @@ def print_failed_task(task_status): task_status["error_message"])) -def _initialize_serialization(job_id, worker=global_worker): - """Initialize the serialization library. - - This defines a custom serializer for object IDs and also tells ray to - serialize several exception classes that we define for error handling. - """ - serialization_context = pyarrow.default_serialization_context() - # Tell the serialization context to use the cloudpickle version that we - # ship with Ray. - serialization_context.set_pickle(pickle.dumps, pickle.loads) - pyarrow.register_torch_serialization_handlers(serialization_context) - - def id_serializer(obj): - if isinstance(obj, ray.ObjectID) and obj.is_direct_actor_type(): - raise NotImplementedError( - "Objects produced by direct actor calls cannot be " - "passed to other tasks as arguments.") - return pickle.dumps(obj) - - def id_deserializer(serialized_obj): - return pickle.loads(serialized_obj) - - for id_type in ray._raylet._ID_TYPES: - serialization_context.register_type( - id_type, - "{}.{}".format(id_type.__module__, id_type.__name__), - custom_serializer=id_serializer, - custom_deserializer=id_deserializer) - - def actor_handle_serializer(obj): - return obj._serialization_helper(True) - - def actor_handle_deserializer(serialized_obj): - new_handle = ray.actor.ActorHandle.__new__(ray.actor.ActorHandle) - new_handle._deserialization_helper(serialized_obj, True) - return new_handle - - # We register this serializer on each worker instead of calling - # _register_custom_serializer from the driver so that isinstance still - # works. - serialization_context.register_type( - ray.actor.ActorHandle, - "ray.ActorHandle", - pickle=False, - custom_serializer=actor_handle_serializer, - custom_deserializer=actor_handle_deserializer) - - worker.serialization_context_map[job_id] = serialization_context - - if not worker.use_pickle: - for error_cls in RAY_EXCEPTION_TYPES: - _register_custom_serializer( - error_cls, - use_dict=True, - local=True, - job_id=job_id, - class_id=error_cls.__module__ + ". " + error_cls.__name__, - ) - # Tell Ray to serialize lambdas with pickle. - _register_custom_serializer( - type(lambda: 0), - use_pickle=True, - local=True, - job_id=job_id, - class_id="lambda") - # Tell Ray to serialize types with pickle. - _register_custom_serializer( - type(int), - use_pickle=True, - local=True, - job_id=job_id, - class_id="type") - # Tell Ray to serialize RayParameters as dictionaries. This is - # used when passing around actor handles. - _register_custom_serializer( - ray.signature.RayParameter, - use_dict=True, - local=True, - job_id=job_id, - class_id="ray.signature.RayParameter") - # Tell Ray to serialize StringIO with pickle. We do this because - # Ray's default __dict__ serialization is incorrect for this type - # (the object's __dict__ is empty and therefore doesn't - # contain the full state of the object). - _register_custom_serializer( - io.StringIO, - use_pickle=True, - local=True, - job_id=job_id, - class_id="io.StringIO") - - def init(address=None, redis_address=None, num_cpus=None, @@ -835,7 +556,7 @@ def init(address=None, raylet_socket_name=None, temp_dir=None, load_code_from_local=False, - use_pickle=False, + use_pickle=ray.cloudpickle.FAST_CLOUDPICKLE_USED, _internal_config=None): """Connect to an existing Ray cluster or start one and connect to it. @@ -1610,48 +1331,6 @@ def _changeproctitle(title, next_title): setproctitle.setproctitle(next_title) -def _try_to_compute_deterministic_class_id(cls, depth=5): - """Attempt to produce a deterministic class ID for a given class. - - The goal here is for the class ID to be the same when this is run on - different worker processes. Pickling, loading, and pickling again seems to - produce more consistent results than simply pickling. This is a bit crazy - and could cause problems, in which case we should revert it and figure out - something better. - - Args: - cls: The class to produce an ID for. - depth: The number of times to repeatedly try to load and dump the - string while trying to reach a fixed point. - - Returns: - A class ID for this class. We attempt to make the class ID the same - when this function is run on different workers, but that is not - guaranteed. - - Raises: - Exception: This could raise an exception if cloudpickle raises an - exception. - """ - # Pickling, loading, and pickling again seems to produce more consistent - # results than simply pickling. This is a bit - class_id = pickle.dumps(cls) - for _ in range(depth): - new_class_id = pickle.dumps(pickle.loads(class_id)) - if new_class_id == class_id: - # We appear to have reached a fix point, so use this as the ID. - return hashlib.sha1(new_class_id).digest() - class_id = new_class_id - - # We have not reached a fixed point, so we may end up with a different - # class ID for this custom class on each worker, which could lead to the - # same class definition being exported many many times. - logger.warning( - "WARNING: Could not produce a deterministic class ID for class " - "{}".format(cls)) - return hashlib.sha1(new_class_id).digest() - - def register_custom_serializer(cls, serializer=None, deserializer=None, @@ -1664,7 +1343,7 @@ def register_custom_serializer(cls, The serializer and deserializer are used when transferring objects of `cls` across processes and nodes. This can be significantly faster than - the Ray default fallbacks. Wraps `_register_custom_serializer` underneath. + the Ray default fallbacks. Wraps `register_custom_serializer` underneath. `use_pickle` tells Ray to automatically use cloudpickle for serialization, and `use_dict` automatically uses `cls.__dict__`. @@ -1697,13 +1376,14 @@ def register_custom_serializer(cls, raise DeprecationWarning( "`job_id` is no longer a valid parameter and will be removed in " "future versions of Ray. If this breaks your application, " - "see `ray.worker._register_custom_serializer`.") + "see `SerializationContext.register_custom_serializer`.") if local: raise DeprecationWarning( "`local` is no longer a valid parameter and will be removed in " "future versions of Ray. If this breaks your application, " - "see `ray.worker._register_custom_serializer`.") - _register_custom_serializer( + "see `SerializationContext.register_custom_serializer`.") + context = global_worker.get_serialization_context() + context.register_custom_serializer( cls, use_pickle=use_pickle, use_dict=use_dict, @@ -1712,126 +1392,6 @@ def register_custom_serializer(cls, class_id=class_id) -def _register_custom_serializer(cls, - use_pickle=False, - use_dict=False, - serializer=None, - deserializer=None, - local=False, - job_id=None, - class_id=None): - """Enable serialization and deserialization for a particular class. - - This method runs the register_class function defined below on every worker, - which will enable ray to properly serialize and deserialize objects of - this class. - - Args: - cls (type): The class that ray should use this custom serializer for. - use_pickle (bool): If true, then objects of this class will be - serialized using pickle. - use_dict: If true, then objects of this class be serialized turning - their __dict__ fields into a dictionary. Must be False if - use_pickle is true. - serializer: The custom serializer to use. This should be provided if - and only if use_pickle and use_dict are False. - deserializer: The custom deserializer to use. This should be provided - if and only if use_pickle and use_dict are False. - local: True if the serializers should only be registered on the current - worker. This should usually be False. - job_id: ID of the job that we want to register the class for. - class_id (str): Unique ID of the class. Autogenerated if None. - - Raises: - RayNotDictionarySerializable: Raised if use_dict is true and cls cannot - be efficiently serialized by Ray. - ValueError: Raised if ray could not autogenerate a class_id. - """ - worker = global_worker - assert (serializer is None) == (deserializer is None), ( - "The serializer/deserializer arguments must both be provided or " - "both not be provided.") - use_custom_serializer = (serializer is not None) - - assert use_custom_serializer + use_pickle + use_dict == 1, ( - "Exactly one of use_pickle, use_dict, or serializer/deserializer must " - "be specified.") - - if worker.use_pickle and serializer is None: - # In this case it should do nothing. - return - - if use_dict: - # Raise an exception if cls cannot be serialized efficiently by Ray. - serialization.check_serializable(cls) - - if class_id is None: - if not local: - # In this case, the class ID will be used to deduplicate the class - # across workers. Note that cloudpickle unfortunately does not - # produce deterministic strings, so these IDs could be different - # on different workers. We could use something weaker like - # cls.__name__, however that would run the risk of having - # collisions. - # TODO(rkn): We should improve this. - try: - # Attempt to produce a class ID that will be the same on each - # worker. However, determinism is not guaranteed, and the - # result may be different on different workers. - class_id = _try_to_compute_deterministic_class_id(cls) - except Exception: - raise ValueError( - "Failed to use pickle in generating a unique id for '{}'. " - "Provide a unique class_id.".format(cls)) - else: - # In this case, the class ID only needs to be meaningful on this - # worker and not across workers. - class_id = _random_string() - - # Make sure class_id is a string. - class_id = ray.utils.binary_to_hex(class_id) - - if job_id is None: - job_id = worker.current_job_id - assert isinstance(job_id, JobID) - - def register_class_for_serialization(worker_info): - if worker_info["worker"].use_pickle: - if pickle.FAST_CLOUDPICKLE_USED: - # construct a reducer - pickle.CloudPickler.dispatch[ - cls] = lambda obj: (deserializer, (serializer(obj), )) - else: - - def _CloudPicklerReducer(_self, obj): - _self.save_reduce( - deserializer, (serializer(obj), ), obj=obj) - - # use a placeholder for 'self' argument - pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer - else: - # TODO(rkn): We need to be more thoughtful about what to do if - # custom serializers have already been registered for class_id. - # In some cases, we may want to use the last user-defined - # serializers and ignore subsequent calls to - # register_custom_serializer that were made by the system. - serialization_context = worker_info[ - "worker"].get_serialization_context(job_id) - serialization_context.register_type( - cls, - class_id, - pickle=use_pickle, - custom_serializer=serializer, - custom_deserializer=deserializer) - - if not local: - worker.run_function_on_all_workers(register_class_for_serialization) - else: - # Since we are pickling objects of this class, we don't actually need - # to ship the class definition. - register_class_for_serialization({"worker": worker}) - - def get(object_ids): """Get a remote object or a list of remote objects from the object store. diff --git a/python/setup.py b/python/setup.py index abf5356ce..c65231fce 100644 --- a/python/setup.py +++ b/python/setup.py @@ -102,7 +102,14 @@ class build_ext(_build_ext.build_ext): for name in filenames: pyarrow_files.append(os.path.join(root, name)) - files_to_include = ray_files + pyarrow_files + # We also need to install pickle5 along with Ray, so make sure that the + # relevant non-Python pickle5 files get copied. + pickle5_files = [] + for (root, dirs, filenames) in os.walk("./ray/pickle5_files/pickle5"): + for name in filenames: + pickle5_files.append(os.path.join(root, name)) + + files_to_include = ray_files + pyarrow_files + pickle5_files # Copy over the autogenerated protobuf Python bindings. for directory in generated_python_directories: