workaround for python3.5 fast numpy serialization (#6675)

This commit is contained in:
Siyuan (Ryans) Zhuang
2020-02-03 16:08:18 -05:00
committed by GitHub
parent 271de9b04d
commit 42cbf801e1
5 changed files with 110 additions and 281 deletions
+1 -4
View File
@@ -580,10 +580,7 @@ cdef write_serialized_object(
(<Pickle5Writer>serialized_object.writer).write_to(
serialized_object.inband, buf, MEMCOPY_THREADS)
else:
buffer = Buffer.make(buf)
stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
serialized_object.serialized_object.write_to(stream)
raise TypeError("Unsupported serialization type.")
cdef class CoreWorker:
@@ -20,6 +20,8 @@ import sys
import types
import weakref
import numpy
from .cloudpickle import (
_is_dynamic, _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL,
_find_imported_submodules, _get_cell_contents, _is_global, _builtin_type,
@@ -407,6 +409,44 @@ def _property_reduce(obj):
return property, (obj.fget, obj.fset, obj.fdel, obj.__doc__)
def _numpy_ndarray_reduce(array):
# This function is implemented according to 'array_reduce_ex_picklebuffer'
# in numpy C backend. This is a workaround for python3.5 pickling support.
if sys.version_info >= (3, 8):
import pickle
picklebuf_class = pickle.PickleBuffer
elif sys.version_info >= (3, 5):
try:
import pickle5
picklebuf_class = pickle5.PickleBuffer
except Exception:
raise ImportError("Using pickle protocol 5 requires the pickle5 "
"module for Python >=3.5 and <3.8")
else:
raise ValueError("pickle protocol 5 is not available for Python < 3.5")
# if the array if Fortran-contiguous and not C-contiguous,
# the PickleBuffer instance will hold a view on the transpose
# of the initial array, that is C-contiguous.
if not array.flags.c_contiguous and array.flags.f_contiguous:
order = 'F'
picklebuf_args = array.transpose()
else:
order = 'C'
picklebuf_args = array
try:
buffer = picklebuf_class(picklebuf_args)
except Exception:
# Some arrays may refuse to export a buffer, in which case
# just fall back on regular __reduce_ex__ implementation
# (gh-12745).
return array.__reduce__()
# Get the _frombuffer() function for reconstruction
import numpy.core.numeric as numeric_mod
from_buffer_func = numeric_mod._frombuffer
return from_buffer_func, (buffer, array.dtype, array.shape, order)
class CloudPickler(Pickler):
"""Fast C Pickler extension with additional reducing routines.
@@ -487,6 +527,15 @@ class CloudPickler(Pickler):
for other types that suffered from type-specific reducers, such as
Exceptions. See https://github.com/cloudpipe/cloudpickle/issues/248
"""
# This is a patch for python3.5
if isinstance(obj, numpy.ndarray):
if (self.proto < 5 or
(not obj.flags.c_contiguous and not obj.flags.f_contiguous) or
obj.dtype == 'O' or obj.itemsize == 0):
return NotImplemented
return _numpy_ndarray_reduce(obj)
t = type(obj)
try:
is_anyclass = issubclass(t, type)
+57 -273
View File
@@ -1,9 +1,7 @@
import hashlib
import io
import logging
import time
import pyarrow
import pyarrow.plasma as plasma
import ray.cloudpickle as pickle
@@ -15,7 +13,6 @@ from ray.exceptions import (
RayActorError,
RayWorkerError,
UnreconstructableError,
RAY_EXCEPTION_TYPES,
)
from ray._raylet import Pickle5Writer, unpack_pickle5_buffers
@@ -65,16 +62,6 @@ class Pickle5SerializedObject(SerializedObject):
return self._total_bytes
class ArrowSerializedObject(SerializedObject):
def __init__(self, serialized_object):
super(ArrowSerializedObject, self).__init__(b"")
self.serialized_object = serialized_object
@property
def total_bytes(self):
return self.serialized_object.total_bytes
class RawSerializedObject(SerializedObject):
def __init__(self, value):
super(RawSerializedObject,
@@ -137,6 +124,7 @@ class SerializationContext:
def __init__(self, worker):
self.worker = worker
assert worker.use_pickle
self.use_pickle = worker.use_pickle
def actor_handle_serializer(obj):
@@ -147,175 +135,62 @@ class SerializationContext:
new_handle._deserialization_helper(serialized_obj, True)
return new_handle
if not worker.use_pickle:
serialization_context = pyarrow.default_serialization_context()
# Tell the serialization context to use the cloudpickle version
# that we ship with Ray.
serialization_context.set_pickle(pickle.dumps, pickle.loads)
pyarrow.register_torch_serialization_handlers(
serialization_context)
self._register_cloudpickle_serializer(
ray.actor.ActorHandle,
custom_serializer=actor_handle_serializer,
custom_deserializer=actor_handle_deserializer)
def id_serializer(obj):
return pickle.dumps(obj)
def id_serializer(obj):
return obj.__reduce__()
def id_deserializer(serialized_obj):
return pickle.loads(serialized_obj)
def id_deserializer(serialized_obj):
return serialized_obj[0](*serialized_obj[1])
def object_id_serializer(obj):
owner_id = ""
owner_address = ""
if obj.is_direct_call_type():
worker = ray.worker.get_global_worker()
worker.check_connected()
obj, owner_id, owner_address = (
worker.core_worker.serialize_and_promote_object_id(obj)
)
obj = obj.__reduce__()
owner_id = owner_id.__reduce__() if owner_id else owner_id
return pickle.dumps((obj, owner_id, owner_address))
def object_id_serializer(obj):
owner_id = ""
owner_address = ""
if obj.is_direct_call_type():
worker = ray.worker.get_global_worker()
worker.check_connected()
obj, owner_id, owner_address = (
worker.core_worker.serialize_and_promote_object_id(obj))
obj = id_serializer(obj)
owner_id = id_serializer(owner_id) if owner_id else owner_id
return (obj, owner_id, owner_address)
def object_id_deserializer(serialized_obj):
obj_id, owner_id, owner_address = pickle.loads(serialized_obj)
# NOTE(swang): Must deserialize the object first before asking
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectID is greater than 0 by the
# time the core worker resolves the value of the object.
deserialized_object_id = obj_id[0](obj_id[1][0])
if owner_id:
worker = ray.worker.get_global_worker()
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], owner_id[1][0], owner_address)
return deserialized_object_id
def object_id_deserializer(serialized_obj):
obj_id, owner_id, owner_address = serialized_obj
# NOTE(swang): Must deserialize the object first before asking
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectID is greater than 0 by the
# time the core worker resolves the value of the object.
deserialized_object_id = id_deserializer(obj_id)
if owner_id:
worker = ray.worker.get_global_worker()
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], owner_id[1][0], owner_address)
return deserialized_object_id
for id_type in ray._raylet._ID_TYPES:
if id_type == ray._raylet.ObjectID:
serialization_context.register_type(
id_type,
"{}.{}".format(id_type.__module__, id_type.__name__),
custom_serializer=object_id_serializer,
custom_deserializer=object_id_deserializer)
else:
serialization_context.register_type(
id_type,
"{}.{}".format(id_type.__module__, id_type.__name__),
custom_serializer=id_serializer,
custom_deserializer=id_deserializer)
# We register this serializer on each worker instead of calling
# _register_custom_serializer from the driver so that isinstance
# still works.
serialization_context.register_type(
ray.actor.ActorHandle,
"ray.ActorHandle",
pickle=False,
custom_serializer=actor_handle_serializer,
custom_deserializer=actor_handle_deserializer)
self.pyarrow_context = serialization_context
else:
self._register_cloudpickle_serializer(
ray.actor.ActorHandle,
custom_serializer=actor_handle_serializer,
custom_deserializer=actor_handle_deserializer)
def id_serializer(obj):
return obj.__reduce__()
def id_deserializer(serialized_obj):
return serialized_obj[0](*serialized_obj[1])
def object_id_serializer(obj):
owner_id = ""
owner_address = ""
if obj.is_direct_call_type():
worker = ray.worker.get_global_worker()
worker.check_connected()
obj, owner_id, owner_address = (
worker.core_worker.serialize_and_promote_object_id(obj)
)
obj = id_serializer(obj)
owner_id = id_serializer(owner_id) if owner_id else owner_id
return (obj, owner_id, owner_address)
def object_id_deserializer(serialized_obj):
obj_id, owner_id, owner_address = serialized_obj
# NOTE(swang): Must deserialize the object first before asking
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectID is greater than 0 by the
# time the core worker resolves the value of the object.
deserialized_object_id = id_deserializer(obj_id)
if owner_id:
worker = ray.worker.get_global_worker()
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], owner_id[1][0], owner_address)
return deserialized_object_id
for id_type in ray._raylet._ID_TYPES:
if id_type == ray._raylet.ObjectID:
self._register_cloudpickle_serializer(
id_type, object_id_serializer, object_id_deserializer)
else:
self._register_cloudpickle_serializer(
id_type, id_serializer, id_deserializer)
def initialize(self):
""" Register custom serializers """
if not self.worker.use_pickle:
for error_cls in RAY_EXCEPTION_TYPES:
self.register_custom_serializer(
error_cls,
use_dict=True,
local=True,
class_id=error_cls.__module__ + ". " + error_cls.__name__,
)
# Tell Ray to serialize lambdas with pickle.
self.register_custom_serializer(
type(lambda: 0),
use_pickle=True,
local=True,
class_id="lambda")
# Tell Ray to serialize types with pickle.
self.register_custom_serializer(
type(int), use_pickle=True, local=True, class_id="type")
# Tell Ray to serialize RayParameters as dictionaries. This is
# used when passing around actor handles.
self.register_custom_serializer(
ray.signature.RayParameter,
use_dict=True,
local=True,
class_id="ray.signature.RayParameter")
# Tell Ray to serialize StringIO with pickle. We do this because
# Ray's default __dict__ serialization is incorrect for this type
# (the object's __dict__ is empty and therefore doesn't
# contain the full state of the object).
self.register_custom_serializer(
io.StringIO,
use_pickle=True,
local=True,
class_id="io.StringIO")
for id_type in ray._raylet._ID_TYPES:
if id_type == ray._raylet.ObjectID:
self._register_cloudpickle_serializer(
id_type, object_id_serializer, object_id_deserializer)
else:
self._register_cloudpickle_serializer(id_type, id_serializer,
id_deserializer)
def _register_cloudpickle_serializer(self, cls, custom_serializer,
custom_deserializer):
if pickle.FAST_CLOUDPICKLE_USED:
assert pickle.FAST_CLOUDPICKLE_USED
def _CloudPicklerReducer(obj):
return custom_deserializer, (custom_serializer(obj), )
def _CloudPicklerReducer(obj):
return custom_deserializer, (custom_serializer(obj), )
# construct a reducer
pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer
else:
def _CloudPicklerReducer(_self, obj):
_self.save_reduce(
custom_deserializer, (custom_serializer(obj), ), obj=obj)
# use a placeholder for 'self' argument
pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer
# construct a reducer
pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer
def _deserialize_object(self, data, metadata, object_id):
if metadata:
@@ -352,15 +227,7 @@ class SerializationContext:
"Tried to get object that has been promoted to plasma."
assert False, "Unrecognized error type " + str(error_type)
elif data:
if self.use_pickle:
raise ValueError("Receiving plasma serialized objects "
"while the serialization context is "
"using pickle5 as the backend.")
try:
# If data is not empty, deserialize the object.
return pyarrow.deserialize(data, self.pyarrow_context)
except pyarrow.DeserializationCallbackError:
raise DeserializationError()
raise ValueError("non-null object should always have metadata")
else:
# Object isn't available in plasma. This should never be returned
# to the user. We should only reach this line if this object was
@@ -368,64 +235,10 @@ class SerializationContext:
# throws an exception.
return plasma.ObjectNotAvailable
def _store_and_register_pyarrow(self, value, depth=100):
"""Store an object and attempt to register its class if needed.
Args:
value: The value to put in the object store.
depth: The maximum number of classes to recursively register.
Raises:
Exception: An exception is raised if the attempt to serialize the
object fails.
"""
counter = 0
while True:
if counter == depth:
raise Exception("Ray exceeded the maximum number of classes "
"that it will recursively serialize when "
"attempting to serialize an object of "
"type {}.".format(type(value)))
counter += 1
try:
return pyarrow.serialize(value, self.pyarrow_context)
except pyarrow.SerializationCallbackError as e:
cls_type = type(e.example_object)
try:
self.register_custom_serializer(cls_type, use_dict=True)
warning_message = (
"WARNING: Serializing objects of type "
"{} by expanding them as dictionaries "
"of their fields. This behavior may "
"be incorrect in some cases.".format(cls_type))
logger.debug(warning_message)
except (RayNotDictionarySerializable, CloudPickleError,
pickle.pickle.PicklingError, Exception):
# We also handle generic exceptions here because
# cloudpickle can fail with many different types of errors.
warning_message = (
"Falling back to serializing {} objects by using "
"pickle. Use `ray.register_custom_serializer({},...)` "
"to provide faster serialization.".format(
cls_type, cls_type))
try:
self.register_custom_serializer(
cls_type, use_pickle=True)
logger.warning(warning_message)
except (CloudPickleError, ValueError):
self.register_custom_serializer(
cls_type, use_pickle=True, local=True)
warning_message = ("WARNING: Pickling the class {} "
"failed, so we are using pickle "
"and only registering the class "
"locally.".format(cls_type))
logger.warning(warning_message)
def deserialize_objects(self,
data_metadata_pairs,
object_ids,
error_timeout=10):
pass
assert len(data_metadata_pairs) == len(object_ids)
start_time = time.time()
@@ -473,29 +286,12 @@ class SerializationContext:
# that this object can also be read by Java.
return RawSerializedObject(value)
if self.worker.use_pickle:
writer = Pickle5Writer()
if ray.cloudpickle.FAST_CLOUDPICKLE_USED:
inband = pickle.dumps(
value, protocol=5, buffer_callback=writer.buffer_callback)
else:
inband = pickle.dumps(value)
return Pickle5SerializedObject(inband, writer)
else:
try:
serialized_value = self._store_and_register_pyarrow(value)
except TypeError:
# TypeError can happen because one of the members of the object
# may not be serializable for cloudpickle. So we need
# these extra fallbacks here to start from the beginning.
# Hopefully the object could have a `__reduce__` method.
self.register_custom_serializer(type(value), use_pickle=True)
logger.warning("WARNING: Serializing the class {} failed, "
"falling back to cloudpickle.".format(
type(value)))
serialized_value = self._store_and_register_pyarrow(value)
return ArrowSerializedObject(serialized_value)
assert self.worker.use_pickle
assert ray.cloudpickle.FAST_CLOUDPICKLE_USED
writer = Pickle5Writer()
inband = pickle.dumps(
value, protocol=5, buffer_callback=writer.buffer_callback)
return Pickle5SerializedObject(inband, writer)
def register_custom_serializer(self,
cls,
@@ -583,22 +379,10 @@ class SerializationContext:
assert isinstance(job_id, JobID)
def register_class_for_serialization(worker_info):
assert worker_info["worker"].use_pickle
context = worker_info["worker"].get_serialization_context(job_id)
if worker_info["worker"].use_pickle:
context._register_cloudpickle_serializer(
cls, serializer, deserializer)
else:
# TODO(rkn): We need to be more thoughtful about what to do if
# custom serializers have already been registered for
# class_id. In some cases, we may want to use the last
# user-defined serializers and ignore subsequent calls to
# register_custom_serializer that were made by the system.
context.pyarrow_context.register_type(
cls,
class_id,
pickle=use_pickle,
custom_serializer=serializer,
custom_deserializer=deserializer)
context._register_cloudpickle_serializer(cls, serializer,
deserializer)
if not local:
self.worker.run_function_on_all_workers(
-1
View File
@@ -212,7 +212,6 @@ class Worker:
if job_id not in self.serialization_context_map:
self.serialization_context_map[
job_id] = serialization.SerializationContext(self)
self.serialization_context_map[job_id].initialize()
return self.serialization_context_map[job_id]
def check_connected(self):