From ddd62a177fb2cb82a3dc267705fa01ed6e7decc6 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Mon, 31 Aug 2020 20:41:37 -0700 Subject: [PATCH] Revert "[Serialization] Update CloudPickle to 1.6.0 (#9694)" (#10460) This reverts commit f0c3910d598e7964a28b4245bd6e4024114bdbd2. --- python/ray/cloudpickle/__init__.py | 12 +- python/ray/cloudpickle/cloudpickle.py | 760 ++++++++++++++++++--- python/ray/cloudpickle/cloudpickle_fast.py | 573 +++++++--------- python/ray/cloudpickle/compat.py | 13 - python/ray/includes/serialization.pxi | 4 +- python/ray/tests/BUILD | 2 +- python/ray/tests/test_basic.py | 47 ++ python/ray/tests/test_serialization.py | 46 -- 8 files changed, 936 insertions(+), 521 deletions(-) delete mode 100644 python/ray/cloudpickle/compat.py diff --git a/python/ray/cloudpickle/__init__.py b/python/ray/cloudpickle/__init__.py index b28e91ee8..91c46bb76 100644 --- a/python/ray/cloudpickle/__init__.py +++ b/python/ray/cloudpickle/__init__.py @@ -1,11 +1,3 @@ -from __future__ import absolute_import +from ray.cloudpickle.cloudpickle_fast import * # noqa: F401, F403 - -from ray.cloudpickle.cloudpickle import * # noqa -from ray.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump # noqa - -# Conform to the convention used by python serialization libraries, which -# expose their Pickler subclass at top-level under the "Pickler" name. -Pickler = CloudPickler - -__version__ = '1.6.0' +__version__ = '1.4.1' diff --git a/python/ray/cloudpickle/cloudpickle.py b/python/ray/cloudpickle/cloudpickle.py index 05d52afa0..c639daab1 100644 --- a/python/ray/cloudpickle/cloudpickle.py +++ b/python/ray/cloudpickle/cloudpickle.py @@ -42,21 +42,29 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from __future__ import print_function +import abc import builtins import dis +import io +import itertools +import logging import opcode +import operator +import pickle import platform +import struct import sys import types import weakref import uuid import threading import typing -import warnings +from enum import Enum -from .compat import pickle from typing import Generic, Union, Tuple, Callable +from pickle import _Pickler as Pickler from pickle import _getattribute +from io import BytesIO from importlib._bootstrap import _find_spec try: # pragma: no branch @@ -70,17 +78,6 @@ if sys.version_info >= (3, 5, 3): else: # pragma: no cover ClassVar = None -if sys.version_info >= (3, 8): - from types import CellType -else: - def f(): - a = 1 - - def g(): - return a - return g - CellType = type(f().__closure__[0]) - # cloudpickle is meant for inter process communication: we expect all # communicating processes to run the same Python version hence we favor @@ -166,24 +163,9 @@ def _whichmodule(obj, name): return None -def _is_importable(obj, name=None): - """Dispatcher utility to test the importability of various constructs.""" - if isinstance(obj, types.FunctionType): - return _lookup_module_and_qualname(obj, name=name) is not None - elif issubclass(type(obj), type): - return _lookup_module_and_qualname(obj, name=name) is not None - elif isinstance(obj, types.ModuleType): - # We assume that sys.modules is primarily used as a cache mechanism for - # the Python import machinery. Checking if a module has been added in - # is sys.modules therefore a cheap and simple heuristic to tell us whether - # we can assume that a given module could be imported by name in - # another Python process. - return obj.__name__ in sys.modules - else: - raise TypeError( - "cannot check importability of {} instances".format( - type(obj).__name__) - ) +def _is_importable_by_name(obj, name=None): + """Determine if obj can be pickled as attribute of a file-backed module""" + return _lookup_module_and_qualname(obj, name=name) is not None def _lookup_module_and_qualname(obj, name=None): @@ -205,8 +187,6 @@ def _lookup_module_and_qualname(obj, name=None): if module_name == "__main__": return None - # Note: if module_name is in sys.modules, the corresponding module is - # assumed importable at unpickling time. See #357 module = sys.modules.get(module_name, None) if module is None: # The main reason why obj's module would not be imported is that this @@ -216,6 +196,10 @@ def _lookup_module_and_qualname(obj, name=None): # supported, as the standard pickle does not support it either. return None + # module has been added to sys.modules, but it can still be dynamic. + if _is_dynamic(module): + return None + try: obj2, parent = _getattribute(module, name) except AttributeError: @@ -474,61 +458,577 @@ if sys.version_info[:2] < (3, 7): # pragma: no branch def _create_parametrized_type_hint(origin, args): return origin[args] -else: - _is_parametrized_type_hint = None - _create_parametrized_type_hint = None -def parametrized_type_hint_getinitargs(obj): - # The distorted type check sematic for typing construct becomes: - # ``type(obj) is type(TypeHint)``, which means "obj is a - # parametrized TypeHint" - if type(obj) is type(Literal): # pragma: no branch - initargs = (Literal, obj.__values__) - elif type(obj) is type(Final): # pragma: no branch - initargs = (Final, obj.__type__) - elif type(obj) is type(ClassVar): - initargs = (ClassVar, obj.__type__) - elif type(obj) is type(Generic): - parameters = obj.__parameters__ - if len(obj.__parameters__) > 0: - # in early Python 3.5, __parameters__ was sometimes - # preferred to __args__ - initargs = (obj.__origin__, parameters) +class CloudPickler(Pickler): - else: - initargs = (obj.__origin__, obj.__args__) - elif type(obj) is type(Union): - if sys.version_info < (3, 5, 3): # pragma: no cover - initargs = (Union, obj.__union_params__) - else: - initargs = (Union, obj.__args__) - elif type(obj) is type(Tuple): - if sys.version_info < (3, 5, 3): # pragma: no cover - initargs = (Tuple, obj.__tuple_params__) - else: - initargs = (Tuple, obj.__args__) - elif type(obj) is type(Callable): - if sys.version_info < (3, 5, 3): # pragma: no cover - args = obj.__args__ - result = obj.__result__ - if args != Ellipsis: - if isinstance(args, tuple): - args = list(args) - else: - args = [args] - else: - (*args, result) = obj.__args__ - if len(args) == 1 and args[0] is Ellipsis: - args = Ellipsis + dispatch = Pickler.dispatch.copy() + + def __init__(self, file, protocol=None): + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__(self, file, protocol=protocol) + # map ids to dictionary. used to ensure that functions can share global env + self.globals_ref = {} + + def dump(self, obj): + self.inject_addons() + try: + return Pickler.dump(self, obj) + except RuntimeError as e: + if 'recursion' in e.args[0]: + msg = """Could not pickle object as excessively deep recursion required.""" + raise pickle.PicklingError(msg) else: - args = list(args) - initargs = (Callable, (args, result)) - else: # pragma: no cover - raise pickle.PicklingError( - "Cloudpickle Error: Unknown type {}".format(type(obj)) + raise + + def save_typevar(self, obj): + self.save_reduce(*_typevar_reduce(obj), obj=obj) + + dispatch[typing.TypeVar] = save_typevar + + def save_memoryview(self, obj): + self.save(obj.tobytes()) + + dispatch[memoryview] = save_memoryview + + def save_module(self, obj): + """ + Save a module as an import + """ + if _is_dynamic(obj): + obj.__dict__.pop('__builtins__', None) + self.save_reduce(dynamic_subimport, (obj.__name__, vars(obj)), + obj=obj) + else: + self.save_reduce(subimport, (obj.__name__,), obj=obj) + + dispatch[types.ModuleType] = save_module + + def save_codeobject(self, obj): + """ + Save a code object + """ + if hasattr(obj, "co_posonlyargcount"): # pragma: no branch + args = ( + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, + obj.co_cellvars + ) + else: + args = ( + obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, + obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, + obj.co_names, obj.co_varnames, obj.co_filename, + obj.co_name, obj.co_firstlineno, obj.co_lnotab, + obj.co_freevars, obj.co_cellvars + ) + self.save_reduce(types.CodeType, args, obj=obj) + + dispatch[types.CodeType] = save_codeobject + + def save_function(self, obj, name=None): + """ Registered with the dispatch to handle all function types. + + Determines what kind of function obj is (e.g. lambda, defined at + interactive prompt, etc) and handles the pickling appropriately. + """ + if _is_importable_by_name(obj, name=name): + return Pickler.save_global(self, obj, name=name) + elif PYPY and isinstance(obj.__code__, builtin_code_type): + return self.save_pypy_builtin_func(obj) + else: + return self.save_function_tuple(obj) + + dispatch[types.FunctionType] = save_function + + def save_pypy_builtin_func(self, obj): + """Save pypy equivalent of builtin functions. + + PyPy does not have the concept of builtin-functions. Instead, + builtin-functions are simple function instances, but with a + builtin-code attribute. + Most of the time, builtin functions should be pickled by attribute. But + PyPy has flaky support for __qualname__, so some builtin functions such + as float.__new__ will be classified as dynamic. For this reason only, + we created this special routine. Because builtin-functions are not + expected to have closure or globals, there is no additional hack + (compared the one already implemented in pickle) to protect ourselves + from reference cycles. A simple (reconstructor, newargs, obj.__dict__) + tuple is save_reduced. + + Note also that PyPy improved their support for __qualname__ in v3.6, so + this routing should be removed when cloudpickle supports only PyPy 3.6 + and later. + """ + rv = (types.FunctionType, (obj.__code__, {}, obj.__name__, + obj.__defaults__, obj.__closure__), + obj.__dict__) + self.save_reduce(*rv, obj=obj) + + def _save_dynamic_enum(self, obj, clsdict): + """Special handling for dynamic Enum subclasses + + Use a dedicated Enum constructor (inspired by EnumMeta.__call__) as the + EnumMeta metaclass has complex initialization that makes the Enum + subclasses hold references to their own instances. + """ + members = dict((e.name, e.value) for e in obj) + + self.save_reduce( + _make_skeleton_enum, + (obj.__bases__, obj.__name__, obj.__qualname__, + members, obj.__module__, _get_or_create_tracker_id(obj), None), + obj=obj + ) + + # Cleanup the clsdict that will be passed to _rehydrate_skeleton_class: + # Those attributes are already handled by the metaclass. + for attrname in ["_generate_next_value_", "_member_names_", + "_member_map_", "_member_type_", + "_value2member_map_"]: + clsdict.pop(attrname, None) + for member in members: + clsdict.pop(member) + + def save_dynamic_class(self, obj): + """Save a class that can't be stored as module global. + + This method is used to serialize classes that are defined inside + functions, or that otherwise can't be serialized as attribute lookups + from global modules. + """ + clsdict = _extract_class_dict(obj) + clsdict.pop('__weakref__', None) + + if issubclass(type(obj), abc.ABCMeta): + # If obj is an instance of an ABCMeta subclass, dont pickle the + # cache/negative caches populated during isinstance/issubclass + # checks, but pickle the list of registered subclasses of obj. + clsdict.pop('_abc_cache', None) + clsdict.pop('_abc_negative_cache', None) + clsdict.pop('_abc_negative_cache_version', None) + registry = clsdict.pop('_abc_registry', None) + if registry is None: + # in Python3.7+, the abc caches and registered subclasses of a + # class are bundled into the single _abc_impl attribute + clsdict.pop('_abc_impl', None) + (registry, _, _, _) = abc._get_dump(obj) + + clsdict["_abc_impl"] = [subclass_weakref() + for subclass_weakref in registry] + else: + # In the above if clause, registry is a set of weakrefs -- in + # this case, registry is a WeakSet + clsdict["_abc_impl"] = [type_ for type_ in registry] + + # On PyPy, __doc__ is a readonly attribute, so we need to include it in + # the initial skeleton class. This is safe because we know that the + # doc can't participate in a cycle with the original class. + type_kwargs = {'__doc__': clsdict.pop('__doc__', None)} + + if "__slots__" in clsdict: + type_kwargs['__slots__'] = obj.__slots__ + # pickle string length optimization: member descriptors of obj are + # created automatically from obj's __slots__ attribute, no need to + # save them in obj's state + if isinstance(obj.__slots__, str): + clsdict.pop(obj.__slots__) + else: + for k in obj.__slots__: + clsdict.pop(k, None) + + # If type overrides __dict__ as a property, include it in the type + # kwargs. In Python 2, we can't set this attribute after construction. + # XXX: can this ever happen in Python 3? If so add a test. + __dict__ = clsdict.pop('__dict__', None) + if isinstance(__dict__, property): + type_kwargs['__dict__'] = __dict__ + + save = self.save + write = self.write + + # We write pickle instructions explicitly here to handle the + # possibility that the type object participates in a cycle with its own + # __dict__. We first write an empty "skeleton" version of the class and + # memoize it before writing the class' __dict__ itself. We then write + # instructions to "rehydrate" the skeleton class by restoring the + # attributes from the __dict__. + # + # A type can appear in a cycle with its __dict__ if an instance of the + # type appears in the type's __dict__ (which happens for the stdlib + # Enum class), or if the type defines methods that close over the name + # of the type, (which is common for Python 2-style super() calls). + + # Push the rehydration function. + save(_rehydrate_skeleton_class) + + # Mark the start of the args tuple for the rehydration function. + write(pickle.MARK) + + # Create and memoize an skeleton class with obj's name and bases. + if Enum is not None and issubclass(obj, Enum): + # Special handling of Enum subclasses + self._save_dynamic_enum(obj, clsdict) + else: + # "Regular" class definition: + tp = type(obj) + self.save_reduce(_make_skeleton_class, + (tp, obj.__name__, _get_bases(obj), type_kwargs, + _get_or_create_tracker_id(obj), None), + obj=obj) + + # Now save the rest of obj's __dict__. Any references to obj + # encountered while saving will point to the skeleton class. + save(clsdict) + + # Write a tuple of (skeleton_class, clsdict). + write(pickle.TUPLE) + + # Call _rehydrate_skeleton_class(skeleton_class, clsdict) + write(pickle.REDUCE) + + def save_function_tuple(self, func): + """ Pickles an actual func object. + + A func comprises: code, globals, defaults, closure, and dict. We + extract and save these, injecting reducing functions at certain points + to recreate the func object. Keep in mind that some of these pieces + can contain a ref to the func itself. Thus, a naive save on these + pieces could trigger an infinite loop of save's. To get around that, + we first create a skeleton func object using just the code (this is + safe, since this won't contain a ref to the func), and memoize it as + soon as it's created. The other stuff can then be filled in later. + """ + if is_tornado_coroutine(func): + self.save_reduce(_rebuild_tornado_coroutine, (func.__wrapped__,), + obj=func) + return + + save = self.save + write = self.write + + code, f_globals, defaults, closure_values, dct, base_globals = self.extract_func_data(func) + + save(_fill_function) # skeleton function updater + write(pickle.MARK) # beginning of tuple that _fill_function expects + + # Extract currently-imported submodules used by func. Storing these + # modules in a smoke _cloudpickle_subimports attribute of the object's + # state will trigger the side effect of importing these modules at + # unpickling time (which is necessary for func to work correctly once + # depickled) + submodules = _find_imported_submodules( + code, + itertools.chain(f_globals.values(), closure_values or ()), ) - return initargs + + # create a skeleton function object and memoize it + save(_make_skel_func) + save(( + code, + len(closure_values) if closure_values is not None else -1, + base_globals, + )) + write(pickle.REDUCE) + self.memoize(func) + + # save the rest of the func data needed by _fill_function + state = { + 'globals': f_globals, + 'defaults': defaults, + 'dict': dct, + 'closure_values': closure_values, + 'module': func.__module__, + 'name': func.__name__, + 'doc': func.__doc__, + '_cloudpickle_submodules': submodules + } + if hasattr(func, '__annotations__'): + state['annotations'] = func.__annotations__ + if hasattr(func, '__qualname__'): + state['qualname'] = func.__qualname__ + if hasattr(func, '__kwdefaults__'): + state['kwdefaults'] = func.__kwdefaults__ + save(state) + write(pickle.TUPLE) + write(pickle.REDUCE) # applies _fill_function on the tuple + + def extract_func_data(self, func): + """ + Turn the function into a tuple of data necessary to recreate it: + code, globals, defaults, closure_values, dict + """ + code = func.__code__ + + # extract all global ref's + func_global_refs = _extract_code_globals(code) + + # process all variables referenced by global environment + f_globals = {} + for var in func_global_refs: + if var in func.__globals__: + f_globals[var] = func.__globals__[var] + + # defaults requires no processing + defaults = func.__defaults__ + + # process closure + closure = ( + list(map(_get_cell_contents, func.__closure__)) + if func.__closure__ is not None + else None + ) + + # save the dict + dct = func.__dict__ + + # base_globals represents the future global namespace of func at + # unpickling time. Looking it up and storing it in globals_ref allow + # functions sharing the same globals at pickling time to also + # share them once unpickled, at one condition: since globals_ref is + # an attribute of a Cloudpickler instance, and that a new CloudPickler is + # created each time pickle.dump or pickle.dumps is called, functions + # also need to be saved within the same invokation of + # cloudpickle.dump/cloudpickle.dumps (for example: cloudpickle.dumps([f1, f2])). There + # is no such limitation when using Cloudpickler.dump, as long as the + # multiple invokations are bound to the same Cloudpickler. + base_globals = self.globals_ref.setdefault(id(func.__globals__), {}) + + if base_globals == {}: + # Add module attributes used to resolve relative imports + # instructions inside func. + for k in ["__package__", "__name__", "__path__", "__file__"]: + # Some built-in functions/methods such as object.__new__ have + # their __globals__ set to None in PyPy + if func.__globals__ is not None and k in func.__globals__: + base_globals[k] = func.__globals__[k] + + return (code, f_globals, defaults, closure, dct, base_globals) + + def save_getset_descriptor(self, obj): + return self.save_reduce(getattr, (obj.__objclass__, obj.__name__)) + + dispatch[types.GetSetDescriptorType] = save_getset_descriptor + + def save_global(self, obj, name=None, pack=struct.pack): + """ + Save a "global". + + The name of this method is somewhat misleading: all types get + dispatched here. + """ + if obj is type(None): + return self.save_reduce(type, (None,), obj=obj) + elif obj is type(Ellipsis): + return self.save_reduce(type, (Ellipsis,), obj=obj) + elif obj is type(NotImplemented): + return self.save_reduce(type, (NotImplemented,), obj=obj) + elif obj in _BUILTIN_TYPE_NAMES: + return self.save_reduce( + _builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj) + + if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch + # Parametrized typing constructs in Python < 3.7 are not compatible + # with type checks and ``isinstance`` semantics. For this reason, + # it is easier to detect them using a duck-typing-based check + # (``_is_parametrized_type_hint``) than to populate the Pickler's + # dispatch with type-specific savers. + self._save_parametrized_type_hint(obj) + elif name is not None: + Pickler.save_global(self, obj, name=name) + elif not _is_importable_by_name(obj, name=name): + self.save_dynamic_class(obj) + else: + Pickler.save_global(self, obj, name=name) + + dispatch[type] = save_global + + def save_instancemethod(self, obj): + # Memoization rarely is ever useful due to python bounding + if obj.__self__ is None: + self.save_reduce(getattr, (obj.im_class, obj.__name__)) + else: + self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) + + dispatch[types.MethodType] = save_instancemethod + + def save_property(self, obj): + # properties not correctly saved in python + self.save_reduce(property, (obj.fget, obj.fset, obj.fdel, obj.__doc__), + obj=obj) + + dispatch[property] = save_property + + def save_classmethod(self, obj): + orig_func = obj.__func__ + self.save_reduce(type(obj), (orig_func,), obj=obj) + + dispatch[classmethod] = save_classmethod + dispatch[staticmethod] = save_classmethod + + def save_itemgetter(self, obj): + """itemgetter serializer (needed for namedtuple support)""" + class Dummy: + def __getitem__(self, item): + return item + items = obj(Dummy()) + if not isinstance(items, tuple): + items = (items,) + return self.save_reduce(operator.itemgetter, items) + + if type(operator.itemgetter) is type: + dispatch[operator.itemgetter] = save_itemgetter + + def save_attrgetter(self, obj): + """attrgetter serializer""" + class Dummy(object): + def __init__(self, attrs, index=None): + self.attrs = attrs + self.index = index + def __getattribute__(self, item): + attrs = object.__getattribute__(self, "attrs") + index = object.__getattribute__(self, "index") + if index is None: + index = len(attrs) + attrs.append(item) + else: + attrs[index] = ".".join([attrs[index], item]) + return type(self)(attrs, index) + attrs = [] + obj(Dummy(attrs)) + return self.save_reduce(operator.attrgetter, tuple(attrs)) + + if type(operator.attrgetter) is type: + dispatch[operator.attrgetter] = save_attrgetter + + def save_file(self, obj): + """Save a file""" + + if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): + raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") + if obj is sys.stdout: + return self.save_reduce(getattr, (sys, 'stdout'), obj=obj) + if obj is sys.stderr: + return self.save_reduce(getattr, (sys, 'stderr'), obj=obj) + if obj is sys.stdin: + raise pickle.PicklingError("Cannot pickle standard input") + if obj.closed: + raise pickle.PicklingError("Cannot pickle closed files") + if hasattr(obj, 'isatty') and obj.isatty(): + raise pickle.PicklingError("Cannot pickle files that map to tty objects") + if 'r' not in obj.mode and '+' not in obj.mode: + raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode) + + name = obj.name + + # TODO: also support binary mode files with io.BytesIO + retval = io.StringIO() + + try: + # Read the whole file + curloc = obj.tell() + obj.seek(0) + contents = obj.read() + obj.seek(curloc) + except IOError: + raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) + retval.write(contents) + retval.seek(curloc) + + retval.name = name + self.save(retval) + self.memoize(obj) + + def save_ellipsis(self, obj): + self.save_reduce(_gen_ellipsis, ()) + + def save_not_implemented(self, obj): + self.save_reduce(_gen_not_implemented, ()) + + dispatch[io.TextIOWrapper] = save_file + dispatch[type(Ellipsis)] = save_ellipsis + dispatch[type(NotImplemented)] = save_not_implemented + + def save_weakset(self, obj): + self.save_reduce(weakref.WeakSet, (list(obj),)) + + dispatch[weakref.WeakSet] = save_weakset + + def save_logger(self, obj): + self.save_reduce(logging.getLogger, (obj.name,), obj=obj) + + dispatch[logging.Logger] = save_logger + + def save_root_logger(self, obj): + self.save_reduce(logging.getLogger, (), obj=obj) + + dispatch[logging.RootLogger] = save_root_logger + + if hasattr(types, "MappingProxyType"): # pragma: no branch + def save_mappingproxy(self, obj): + self.save_reduce(types.MappingProxyType, (dict(obj),), obj=obj) + + dispatch[types.MappingProxyType] = save_mappingproxy + + """Special functions for Add-on libraries""" + def inject_addons(self): + """Plug in system. Register additional pickling functions if modules already loaded""" + pass + + if sys.version_info < (3, 7): # pragma: no branch + def _save_parametrized_type_hint(self, obj): + # The distorted type check sematic for typing construct becomes: + # ``type(obj) is type(TypeHint)``, which means "obj is a + # parametrized TypeHint" + if type(obj) is type(Literal): # pragma: no branch + initargs = (Literal, obj.__values__) + elif type(obj) is type(Final): # pragma: no branch + initargs = (Final, obj.__type__) + elif type(obj) is type(ClassVar): + initargs = (ClassVar, obj.__type__) + elif type(obj) is type(Generic): + parameters = obj.__parameters__ + if len(obj.__parameters__) > 0: + # in early Python 3.5, __parameters__ was sometimes + # preferred to __args__ + initargs = (obj.__origin__, parameters) + else: + initargs = (obj.__origin__, obj.__args__) + elif type(obj) is type(Union): + if sys.version_info < (3, 5, 3): # pragma: no cover + initargs = (Union, obj.__union_params__) + else: + initargs = (Union, obj.__args__) + elif type(obj) is type(Tuple): + if sys.version_info < (3, 5, 3): # pragma: no cover + initargs = (Tuple, obj.__tuple_params__) + else: + initargs = (Tuple, obj.__args__) + elif type(obj) is type(Callable): + if sys.version_info < (3, 5, 3): # pragma: no cover + args = obj.__args__ + result = obj.__result__ + if args != Ellipsis: + if isinstance(args, tuple): + args = list(args) + else: + args = [args] + else: + (*args, result) = obj.__args__ + if len(args) == 1 and args[0] is Ellipsis: + args = Ellipsis + else: + args = list(args) + initargs = (Callable, (args, result)) + else: # pragma: no cover + raise pickle.PicklingError( + "Cloudpickle Error: Unknown type {}".format(type(obj)) + ) + self.save_reduce(_create_parametrized_type_hint, initargs, obj=obj) # Tornado support @@ -552,6 +1052,40 @@ def _rebuild_tornado_coroutine(func): return gen.coroutine(func) +# Shorthands for legacy support + +def dump(obj, file, protocol=None): + """Serialize obj as bytes streamed into file + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed + between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + CloudPickler(file, protocol=protocol).dump(obj) + + +def dumps(obj, protocol=None): + """Serialize obj as a string of bytes allocated in memory + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed + between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + file = BytesIO() + try: + cp = CloudPickler(file, protocol=protocol) + cp.dump(obj) + return file.getvalue() + finally: + file.close() + + # including pickles unloading functions in this namespace load = pickle.load loads = pickle.loads @@ -650,7 +1184,7 @@ def _fill_function(*args): if 'annotations' in state: func.__annotations__ = state['annotations'] if 'doc' in state: - func.__doc__ = state['doc'] + func.__doc__ = state['doc'] if 'name' in state: func.__name__ = state['name'] if 'module' in state: @@ -685,24 +1219,11 @@ def _make_empty_cell(): return (lambda: cell).__closure__[0] -def _make_cell(value=_empty_cell_value): - cell = _make_empty_cell() - if value is not _empty_cell_value: - cell_set(cell, value) - return cell - - def _make_skel_func(code, cell_count, base_globals=None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ - # This function is deprecated and should be removed in cloudpickle 1.7 - warnings.warn( - "A pickle file created using an old (<=1.4.1) version of cloudpicke " - "is currently being loaded. This is not supported by cloudpickle and " - "will break in cloudpickle 1.7", category=UserWarning - ) # This is backward-compatibility code: for cloudpickle versions between # 0.5.4 and 0.7, base_globals could be a string or None. base_globals # should now always be a dictionary. @@ -786,6 +1307,39 @@ def _make_skeleton_enum(bases, name, qualname, members, module, return _lookup_class_or_track(class_tracker_id, enum_class) +def _is_dynamic(module): + """ + Return True if the module is special module that cannot be imported by its + name. + """ + # Quick check: module that have __file__ attribute are not dynamic modules. + if hasattr(module, '__file__'): + return False + + if module.__spec__ is not None: + return False + + # In PyPy, Some built-in modules such as _codecs can have their + # __spec__ attribute set to None despite being imported. For such + # modules, the ``_find_spec`` utility of the standard library is used. + parent_name = module.__name__.rpartition('.')[0] + if parent_name: # pragma: no cover + # This code handles the case where an imported package (and not + # module) remains with __spec__ set to None. It is however untested + # as no package in the PyPy stdlib has __spec__ set to None after + # it is imported. + try: + parent = sys.modules[parent_name] + except KeyError: + msg = "parent {!r} not in sys.modules" + raise ImportError(msg.format(parent_name)) + else: + pkgpath = parent.__path__ + else: + pkgpath = None + return _find_spec(module.__name__, pkgpath, module) is None + + def _make_typevar(name, bound, constraints, covariant, contravariant, class_tracker_id): tv = typing.TypeVar( @@ -828,15 +1382,3 @@ def _get_bases(typ): # For regular class objects bases_attr = '__bases__' return getattr(typ, bases_attr) - - -def _make_dict_keys(obj): - return dict.fromkeys(obj).keys() - - -def _make_dict_values(obj): - return {i: _ for i, _ in enumerate(obj)}.values() - - -def _make_dict_items(obj): - return obj.items() diff --git a/python/ray/cloudpickle/cloudpickle_fast.py b/python/ray/cloudpickle/cloudpickle_fast.py index fa8da0f63..fd16493b8 100644 --- a/python/ray/cloudpickle/cloudpickle_fast.py +++ b/python/ray/cloudpickle/cloudpickle_fast.py @@ -10,100 +10,65 @@ Note that the C Pickler sublassing API is CPython-specific. Therefore, some guards present in cloudpickle.py that were written to handle PyPy specificities are not present in cloudpickle_fast.py """ -import _collections_abc import abc import copyreg import io import itertools import logging import sys -import struct import types import weakref import typing -from enum import Enum -from collections import ChainMap - -from .compat import pickle, Pickler from .cloudpickle import ( - _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, - _find_imported_submodules, _get_cell_contents, _is_importable, - _builtin_type, _get_or_create_tracker_id, _make_skeleton_class, - _make_skeleton_enum, _extract_class_dict, dynamic_subimport, subimport, - _typevar_reduce, _get_bases, _make_cell, _make_empty_cell, CellType, - _is_parametrized_type_hint, PYPY, cell_set, - parametrized_type_hint_getinitargs, _create_parametrized_type_hint, - builtin_code_type, - _make_dict_keys, _make_dict_values, _make_dict_items, + _is_dynamic, _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, + _find_imported_submodules, _get_cell_contents, _is_importable_by_name, _builtin_type, + Enum, _get_or_create_tracker_id, _make_skeleton_class, _make_skeleton_enum, + _extract_class_dict, dynamic_subimport, subimport, _typevar_reduce, _get_bases, + cell_set, _make_empty_cell, ) - -if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY: - # Shorthands similar to pickle.dump/pickle.dumps - - def dump(obj, file, protocol=None, buffer_callback=None): - """Serialize obj as bytes streamed into file - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication - speed between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - CloudPickler( - file, protocol=protocol, buffer_callback=buffer_callback - ).dump(obj) - - def dumps(obj, protocol=None, buffer_callback=None): - """Serialize obj as a string of bytes allocated in memory - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication - speed between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - with io.BytesIO() as file: - cp = CloudPickler( - file, protocol=protocol, buffer_callback=buffer_callback - ) - cp.dump(obj) - return file.getvalue() - +if sys.version_info[:2] < (3, 8): + import pickle5 as pickle + from pickle5 import Pickler + load, loads = pickle.load, pickle.loads else: - # Shorthands similar to pickle.dump/pickle.dumps - def dump(obj, file, protocol=None): - """Serialize obj as bytes streamed into file + import _pickle + import pickle + from _pickle import Pickler + load, loads = _pickle.load, _pickle.loads - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication - speed between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - CloudPickler(file, protocol=protocol).dump(obj) - - def dumps(obj, protocol=None): - """Serialize obj as a string of bytes allocated in memory - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication - speed between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - with io.BytesIO() as file: - cp = CloudPickler(file, protocol=protocol) - cp.dump(obj) - return file.getvalue() +import numpy -load, loads = pickle.load, pickle.loads +# Shorthands similar to pickle.dump/pickle.dumps +def dump(obj, file, protocol=None, buffer_callback=None): + """Serialize obj as bytes streamed into file + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed + between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj) + + +def dumps(obj, protocol=None, buffer_callback=None): + """Serialize obj as a string of bytes allocated in memory + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed + between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + with io.BytesIO() as file: + cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback) + cp.dump(obj) + return file.getvalue() # COLLECTION OF OBJECTS __getnewargs__-LIKE METHODS @@ -186,15 +151,21 @@ def _class_getstate(obj): clsdict.pop('_abc_cache', None) clsdict.pop('_abc_negative_cache', None) clsdict.pop('_abc_negative_cache_version', None) + clsdict.pop('_abc_impl', None) registry = clsdict.pop('_abc_registry', None) if registry is None: # in Python3.7+, the abc caches and registered subclasses of a # class are bundled into the single _abc_impl attribute - clsdict.pop('_abc_impl', None) - (registry, _, _, _) = abc._get_dump(obj) - - clsdict["_abc_impl"] = [subclass_weakref() - for subclass_weakref in registry] + if hasattr(abc, '_get_dump'): + (registry, _, _, _) = abc._get_dump(obj) + clsdict["_abc_impl"] = [subclass_weakref() + for subclass_weakref in registry] + else: + # FIXME(suquark): The upstream cloudpickle cannot work in Ray + # because sometimes both '_abc_registry' and '_get_dump' does + # not exist. Some strange typing objects may cause this issue. + # Here the workaround just set "_abc_impl" to None. + clsdict["_abc_impl"] = None else: # In the above if clause, registry is a set of weakrefs -- in # this case, registry is a WeakSet @@ -246,13 +217,13 @@ def _code_reduce(obj): """codeobject reducer""" if hasattr(obj, "co_posonlyargcount"): # pragma: no branch args = ( - obj.co_argcount, obj.co_posonlyargcount, - obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, - obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, - obj.co_varnames, obj.co_filename, obj.co_name, - obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, - obj.co_cellvars - ) + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, + obj.co_cellvars + ) else: args = ( obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, @@ -264,14 +235,26 @@ def _code_reduce(obj): return types.CodeType, args +def _make_cell(contents): + cell = _make_empty_cell() + cell_set(cell, contents) + return cell + + def _cell_reduce(obj): """Cell (containing values of a function's free variables) reducer""" try: - obj.cell_contents + contents = (obj.cell_contents,) except ValueError: # cell is empty - return _make_empty_cell, () + contents = () + + if sys.version_info[:2] < (3, 8): + if contents: + return _make_cell, contents + else: + return _make_empty_cell, () else: - return _make_cell, (obj.cell_contents, ) + return types.CellType, contents def _classmethod_reduce(obj): @@ -315,10 +298,10 @@ def _file_reduce(obj): obj.seek(0) contents = obj.read() obj.seek(curloc) - except IOError as e: + except IOError: raise pickle.PicklingError( "Cannot pickle file %s as it cannot be read" % name - ) from e + ) retval.write(contents) retval.seek(curloc) @@ -339,11 +322,11 @@ def _memoryview_reduce(obj): def _module_reduce(obj): - if _is_importable(obj): - return subimport, (obj.__name__,) - else: + if _is_dynamic(obj): obj.__dict__.pop('__builtins__', None) return dynamic_subimport, (obj.__name__, vars(obj)) + else: + return subimport, (obj.__name__,) def _method_reduce(obj): @@ -396,29 +379,11 @@ def _class_reduce(obj): return type, (NotImplemented,) elif obj in _BUILTIN_TYPE_NAMES: return _builtin_type, (_BUILTIN_TYPE_NAMES[obj],) - elif not _is_importable(obj): + elif not _is_importable_by_name(obj): return _dynamic_class_reduce(obj) return NotImplemented -def _dict_keys_reduce(obj): - # Safer not to ship the full dict as sending the rest might - # be unintended and could potentially cause leaking of - # sensitive information - return _make_dict_keys, (list(obj), ) - - -def _dict_values_reduce(obj): - # Safer not to ship the full dict as sending the rest might - # be unintended and could potentially cause leaking of - # sensitive information - return _make_dict_values, (list(obj), ) - - -def _dict_items_reduce(obj): - return _make_dict_items, (dict(obj), ) - - # COLLECTIONS OF OBJECTS STATE SETTERS # ------------------------------------ # state setters are called at unpickling time, once the object is created and @@ -474,30 +439,154 @@ def _class_setstate(obj, state): return obj +def _numpy_frombuffer(buffer, dtype, shape, order): + # Get the _frombuffer() function for reconstruction + from numpy.core.numeric import _frombuffer + array = _frombuffer(buffer, dtype, shape, order) + # Unfortunately, numpy does not follow the standard, so we still + # have to set the readonly flag for it here. + array.setflags(write=isinstance(buffer, bytearray) or not buffer.readonly) + return array + + +def _numpy_ndarray_reduce(array): + # This function is implemented according to 'array_reduce_ex_picklebuffer' + # in numpy C backend. This is a workaround for python3.5 pickling support. + if sys.version_info >= (3, 8): + import pickle + picklebuf_class = pickle.PickleBuffer + elif sys.version_info >= (3, 5): + try: + import pickle5 + picklebuf_class = pickle5.PickleBuffer + except Exception: + raise ImportError("Using pickle protocol 5 requires the pickle5 " + "module for Python >=3.5 and <3.8") + else: + raise ValueError("pickle protocol 5 is not available for Python < 3.5") + # if the array if Fortran-contiguous and not C-contiguous, + # the PickleBuffer instance will hold a view on the transpose + # of the initial array, that is C-contiguous. + if not array.flags.c_contiguous and array.flags.f_contiguous: + order = "F" + picklebuf_args = array.transpose() + else: + order = "C" + picklebuf_args = array + try: + buffer = picklebuf_class(picklebuf_args) + except Exception: + # Some arrays may refuse to export a buffer, in which case + # just fall back on regular __reduce_ex__ implementation + # (gh-12745). + return array.__reduce__() + + return _numpy_frombuffer, (buffer, array.dtype, array.shape, order) + + class CloudPickler(Pickler): - # set of reducers defined and used by cloudpickle (private) - _dispatch_table = {} - _dispatch_table[classmethod] = _classmethod_reduce - _dispatch_table[io.TextIOWrapper] = _file_reduce - _dispatch_table[logging.Logger] = _logger_reduce - _dispatch_table[logging.RootLogger] = _root_logger_reduce - _dispatch_table[memoryview] = _memoryview_reduce - _dispatch_table[property] = _property_reduce - _dispatch_table[staticmethod] = _classmethod_reduce - _dispatch_table[CellType] = _cell_reduce - _dispatch_table[types.CodeType] = _code_reduce - _dispatch_table[types.GetSetDescriptorType] = _getset_descriptor_reduce - _dispatch_table[types.ModuleType] = _module_reduce - _dispatch_table[types.MethodType] = _method_reduce - _dispatch_table[types.MappingProxyType] = _mappingproxy_reduce - _dispatch_table[weakref.WeakSet] = _weakset_reduce - _dispatch_table[typing.TypeVar] = _typevar_reduce - _dispatch_table[_collections_abc.dict_keys] = _dict_keys_reduce - _dispatch_table[_collections_abc.dict_values] = _dict_values_reduce - _dispatch_table[_collections_abc.dict_items] = _dict_items_reduce + """Fast C Pickler extension with additional reducing routines. + + CloudPickler's extensions exist into into: + + * its dispatch_table containing reducers that are called only if ALL + built-in saving functions were previously discarded. + * a special callback named "reducer_override", invoked before standard + function/class builtin-saving method (save_global), to serialize dynamic + functions + """ + + # cloudpickle's own dispatch_table, containing the additional set of + # objects (compared to the standard library pickle) that cloupickle can + # serialize. + dispatch = {} + dispatch[classmethod] = _classmethod_reduce + dispatch[io.TextIOWrapper] = _file_reduce + dispatch[logging.Logger] = _logger_reduce + dispatch[logging.RootLogger] = _root_logger_reduce + dispatch[memoryview] = _memoryview_reduce + dispatch[property] = _property_reduce + dispatch[staticmethod] = _classmethod_reduce + if sys.version_info[:2] >= (3, 8): + dispatch[types.CellType] = _cell_reduce + else: + dispatch[type(_make_empty_cell())] = _cell_reduce + dispatch[types.CodeType] = _code_reduce + dispatch[types.GetSetDescriptorType] = _getset_descriptor_reduce + dispatch[types.ModuleType] = _module_reduce + dispatch[types.MethodType] = _method_reduce + dispatch[types.MappingProxyType] = _mappingproxy_reduce + dispatch[weakref.WeakSet] = _weakset_reduce + dispatch[typing.TypeVar] = _typevar_reduce + + def __init__(self, file, protocol=None, buffer_callback=None): + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__(self, file, protocol=protocol, buffer_callback=buffer_callback) + # map functions __globals__ attribute ids, to ensure that functions + # sharing the same global namespace at pickling time also share their + # global namespace at unpickling time. + self.globals_ref = {} + + # Take into account potential custom reducers registered by external + # modules + self.dispatch_table = copyreg.dispatch_table.copy() + self.dispatch_table.update(self.dispatch) + self.proto = int(protocol) + + def reducer_override(self, obj): + """Type-agnostic reducing callback for function and classes. + + For performance reasons, subclasses of the C _pickle.Pickler class + cannot register custom reducers for functions and classes in the + dispatch_table. Reducer for such types must instead implemented in the + special reducer_override method. + + Note that method will be called for any object except a few + builtin-types (int, lists, dicts etc.), which differs from reducers in + the Pickler's dispatch_table, each of them being invoked for objects of + a specific type only. + + This property comes in handy for classes: although most classes are + instances of the ``type`` metaclass, some of them can be instances of + other custom metaclasses (such as enum.EnumMeta for example). In + particular, the metaclass will likely not be known in advance, and thus + cannot be special-cased using an entry in the dispatch_table. + reducer_override, among other things, allows us to register a reducer + that will be called for any class, independently of its type. - dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table) + Notes: + + * reducer_override has the priority over dispatch_table-registered + reducers. + * reducer_override can be used to fix other limitations of cloudpickle + for other types that suffered from type-specific reducers, such as + Exceptions. See https://github.com/cloudpipe/cloudpickle/issues/248 + """ + + # This is a patch for python3.5 + if isinstance(obj, numpy.ndarray): + if (self.proto < 5 or + (not obj.flags.c_contiguous and not obj.flags.f_contiguous) or + (issubclass(type(obj), numpy.ndarray) and type(obj) is not numpy.ndarray) or + obj.dtype == "O" or obj.itemsize == 0): + return NotImplemented + return _numpy_ndarray_reduce(obj) + + t = type(obj) + try: + is_anyclass = issubclass(t, type) + except TypeError: # t is not a class (old Boost; see SF #502085) + is_anyclass = False + + if is_anyclass: + return _class_reduce(obj) + elif isinstance(obj, types.FunctionType): + return self._function_reduce(obj) + else: + # fallback to save_global, including the Pickler's distpatch_table + return NotImplemented # function reducers are defined as instance methods of CloudPickler # objects, as they rely on a CloudPickler attribute (globals_ref) @@ -520,7 +609,7 @@ class CloudPickler(Pickler): As opposed to cloudpickle.py, There no special handling for builtin pypy functions because cloudpickle_fast is CPython-specific. """ - if _is_importable(obj): + if _is_importable_by_name(obj): return NotImplemented else: return self._dynamic_function_reduce(obj) @@ -553,8 +642,12 @@ class CloudPickler(Pickler): if func.__closure__ is None: closure = None else: - closure = tuple( - _make_empty_cell() for _ in range(len(code.co_freevars))) + if sys.version_info[:2] >= (3, 8): + closure = tuple( + types.CellType() for _ in range(len(code.co_freevars))) + else: + closure = tuple( + _make_empty_cell() for _ in range(len(code.co_freevars))) return code, base_globals, None, None, closure @@ -567,204 +660,6 @@ class CloudPickler(Pickler): "Could not pickle object as excessively deep recursion " "required." ) - raise pickle.PicklingError(msg) from e + raise pickle.PicklingError(msg) else: raise - - if pickle.HIGHEST_PROTOCOL >= 5: - # `CloudPickler.dispatch` is only left for backward compatibility - note - # that when using protocol 5, `CloudPickler.dispatch` is not an - # extension of `Pickler.dispatch` dictionary, because CloudPickler - # subclasses the C-implemented Pickler, which does not expose a - # `dispatch` attribute. Earlier versions of the protocol 5 CloudPickler - # used `CloudPickler.dispatch` as a class-level attribute storing all - # reducers implemented by cloudpickle, but the attribute name was not a - # great choice given the meaning of `Cloudpickler.dispatch` when - # `CloudPickler` extends the pure-python pickler. - dispatch = dispatch_table - - # Implementation of the reducer_override callback, in order to - # efficiently serialize dynamic functions and classes by subclassing - # the C-implemented Pickler. - # TODO: decorrelate reducer_override (which is tied to CPython's - # implementation - would it make sense to backport it to pypy? - and - # pickle's protocol 5 which is implementation agnostic. Currently, the - # availability of both notions coincide on CPython's pickle and the - # pickle5 backport, but it may not be the case anymore when pypy - # implements protocol 5 - def __init__(self, file, protocol=None, buffer_callback=None): - if protocol is None: - protocol = DEFAULT_PROTOCOL - Pickler.__init__( - self, file, protocol=protocol, buffer_callback=buffer_callback - ) - # map functions __globals__ attribute ids, to ensure that functions - # sharing the same global namespace at pickling time also share - # their global namespace at unpickling time. - self.globals_ref = {} - self.proto = int(protocol) - - def reducer_override(self, obj): - """Type-agnostic reducing callback for function and classes. - - For performance reasons, subclasses of the C _pickle.Pickler class - cannot register custom reducers for functions and classes in the - dispatch_table. Reducer for such types must instead implemented in - the special reducer_override method. - - Note that method will be called for any object except a few - builtin-types (int, lists, dicts etc.), which differs from reducers - in the Pickler's dispatch_table, each of them being invoked for - objects of a specific type only. - - This property comes in handy for classes: although most classes are - instances of the ``type`` metaclass, some of them can be instances - of other custom metaclasses (such as enum.EnumMeta for example). In - particular, the metaclass will likely not be known in advance, and - thus cannot be special-cased using an entry in the dispatch_table. - reducer_override, among other things, allows us to register a - reducer that will be called for any class, independently of its - type. - - - Notes: - - * reducer_override has the priority over dispatch_table-registered - reducers. - * reducer_override can be used to fix other limitations of - cloudpickle for other types that suffered from type-specific - reducers, such as Exceptions. See - https://github.com/cloudpipe/cloudpickle/issues/248 - """ - if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch - return ( - _create_parametrized_type_hint, - parametrized_type_hint_getinitargs(obj) - ) - t = type(obj) - try: - is_anyclass = issubclass(t, type) - except TypeError: # t is not a class (old Boost; see SF #502085) - is_anyclass = False - - if is_anyclass: - return _class_reduce(obj) - elif isinstance(obj, types.FunctionType): - return self._function_reduce(obj) - else: - # fallback to save_global, including the Pickler's - # distpatch_table - return NotImplemented - - else: - # When reducer_override is not available, hack the pure-Python - # Pickler's types.FunctionType and type savers. Note: the type saver - # must override Pickler.save_global, because pickle.py contains a - # hard-coded call to save_global when pickling meta-classes. - dispatch = Pickler.dispatch.copy() - - def __init__(self, file, protocol=None): - if protocol is None: - protocol = DEFAULT_PROTOCOL - Pickler.__init__(self, file, protocol=protocol) - # map functions __globals__ attribute ids, to ensure that functions - # sharing the same global namespace at pickling time also share - # their global namespace at unpickling time. - self.globals_ref = {} - assert hasattr(self, 'proto') - - def _save_reduce_pickle5(self, func, args, state=None, listitems=None, - dictitems=None, state_setter=None, obj=None): - save = self.save - write = self.write - self.save_reduce( - func, args, state=None, listitems=listitems, - dictitems=dictitems, obj=obj - ) - # backport of the Python 3.8 state_setter pickle operations - save(state_setter) - save(obj) # simple BINGET opcode as obj is already memoized. - save(state) - write(pickle.TUPLE2) - # Trigger a state_setter(obj, state) function call. - write(pickle.REDUCE) - # The purpose of state_setter is to carry-out an - # inplace modification of obj. We do not care about what the - # method might return, so its output is eventually removed from - # the stack. - write(pickle.POP) - - def save_global(self, obj, name=None, pack=struct.pack): - """ - Save a "global". - - The name of this method is somewhat misleading: all types get - dispatched here. - """ - if obj is type(None): # noqa - return self.save_reduce(type, (None,), obj=obj) - elif obj is type(Ellipsis): - return self.save_reduce(type, (Ellipsis,), obj=obj) - elif obj is type(NotImplemented): - return self.save_reduce(type, (NotImplemented,), obj=obj) - elif obj in _BUILTIN_TYPE_NAMES: - return self.save_reduce( - _builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj) - - if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch - # Parametrized typing constructs in Python < 3.7 are not - # compatible with type checks and ``isinstance`` semantics. For - # this reason, it is easier to detect them using a - # duck-typing-based check (``_is_parametrized_type_hint``) than - # to populate the Pickler's dispatch with type-specific savers. - self.save_reduce( - _create_parametrized_type_hint, - parametrized_type_hint_getinitargs(obj), - obj=obj - ) - elif name is not None: - Pickler.save_global(self, obj, name=name) - elif not _is_importable(obj, name=name): - self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj) - else: - Pickler.save_global(self, obj, name=name) - dispatch[type] = save_global - - def save_function(self, obj, name=None): - """ Registered with the dispatch to handle all function types. - - Determines what kind of function obj is (e.g. lambda, defined at - interactive prompt, etc) and handles the pickling appropriately. - """ - if _is_importable(obj, name=name): - return Pickler.save_global(self, obj, name=name) - elif PYPY and isinstance(obj.__code__, builtin_code_type): - return self.save_pypy_builtin_func(obj) - else: - return self._save_reduce_pickle5( - *self._dynamic_function_reduce(obj), obj=obj - ) - - def save_pypy_builtin_func(self, obj): - """Save pypy equivalent of builtin functions. - PyPy does not have the concept of builtin-functions. Instead, - builtin-functions are simple function instances, but with a - builtin-code attribute. - Most of the time, builtin functions should be pickled by attribute. - But PyPy has flaky support for __qualname__, so some builtin - functions such as float.__new__ will be classified as dynamic. For - this reason only, we created this special routine. Because - builtin-functions are not expected to have closure or globals, - there is no additional hack (compared the one already implemented - in pickle) to protect ourselves from reference cycles. A simple - (reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note - also that PyPy improved their support for __qualname__ in v3.6, so - this routing should be removed when cloudpickle supports only PyPy - 3.6 and later. - """ - rv = (types.FunctionType, (obj.__code__, {}, obj.__name__, - obj.__defaults__, obj.__closure__), - obj.__dict__) - self.save_reduce(*rv, obj=obj) - - dispatch[types.FunctionType] = save_function diff --git a/python/ray/cloudpickle/compat.py b/python/ray/cloudpickle/compat.py deleted file mode 100644 index afa285f62..000000000 --- a/python/ray/cloudpickle/compat.py +++ /dev/null @@ -1,13 +0,0 @@ -import sys - - -if sys.version_info < (3, 8): - try: - import pickle5 as pickle # noqa: F401 - from pickle5 import Pickler # noqa: F401 - except ImportError: - import pickle # noqa: F401 - from pickle import _Pickler as Pickler # noqa: F401 -else: - import pickle # noqa: F401 - from _pickle import Pickler # noqa: F401 diff --git a/python/ray/includes/serialization.pxi b/python/ray/includes/serialization.pxi index f20165fc1..31a5f1e04 100644 --- a/python/ray/includes/serialization.pxi +++ b/python/ray/includes/serialization.pxi @@ -1,5 +1,6 @@ from libc.string cimport memcpy from libc.stdint cimport uintptr_t, uint64_t, INT32_MAX +from libcpp cimport nullptr import cython DEF MEMCOPY_THREADS = 6 @@ -115,9 +116,6 @@ cdef class SubBuffer: self.buf, self.len) def __getbuffer__(self, Py_buffer* buffer, int flags): - if flags & cpython.PyBUF_WRITABLE: - # Ray ensures all buffers are immutable. - raise BufferError buffer.readonly = self.readonly buffer.buf = self.buf buffer.format = self._format.c_str() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index dee43ed66..055407a7f 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -55,7 +55,6 @@ py_test_module_list( "test_reconstruction.py", "test_reference_counting_2.py", "test_reference_counting.py", - "test_serialization.py", "test_stress.py", "test_stress_sharded.py", "test_unreconstructable_errors.py", @@ -90,6 +89,7 @@ py_test_module_list( "test_node_manager.py", "test_numba.py", "test_ray_init.py", + "test_serialization.py", "test_tempfile.py", "test_webui.py", ], diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 203e484fc..63837c8a2 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1,9 +1,11 @@ # coding: utf-8 +import io import logging import os import pickle import sys import time +import weakref import numpy as np import pytest @@ -414,6 +416,51 @@ def test_ray_recursive_objects(ray_start_shared_local_modes): ray.put(obj) +def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes): + # bpo-39492: reducer_override used to induce a spurious reference cycle + # inside the Pickler object, that could prevent all serialized objects + # from being garbage-collected without explicity invoking gc.collect. + + # test a dynamic function + def f(): + return 4669201609102990671853203821578 + + wr = weakref.ref(f) + + bio = io.BytesIO() + from ray.cloudpickle import CloudPickler, loads, dumps + p = CloudPickler(bio, protocol=5) + p.dump(f) + new_f = loads(bio.getvalue()) + assert new_f() == 4669201609102990671853203821578 + + del p + del f + + assert wr() is None + + # test a dynamic class + class ShortlivedObject: + def __del__(self): + print("Went out of scope!") + + obj = ShortlivedObject() + new_obj = weakref.ref(obj) + + dumps(obj) + del obj + assert new_obj() is None + + +def test_deserialized_from_buffer_immutable(ray_start_shared_local_modes): + x = np.full((2, 2), 1.) + o = ray.put(x) + y = ray.get(o) + with pytest.raises( + ValueError, match="assignment destination is read-only"): + y[0, 0] = 9. + + def test_passing_arguments_by_value_out_of_the_box( ray_start_shared_local_modes): @ray.remote diff --git a/python/ray/tests/test_serialization.py b/python/ray/tests/test_serialization.py index 8004e694d..922c72937 100644 --- a/python/ray/tests/test_serialization.py +++ b/python/ray/tests/test_serialization.py @@ -5,7 +5,6 @@ import logging import re import string import sys -import weakref import numpy as np import pytest @@ -498,51 +497,6 @@ def test_register_class(ray_start_2_cpus): assert not hasattr(c2, "method1") -def test_deserialized_from_buffer_immutable(ray_start_shared_local_modes): - x = np.full((2, 2), 1.) - o = ray.put(x) - y = ray.get(o) - with pytest.raises( - ValueError, match="assignment destination is read-only"): - y[0, 0] = 9. - - -def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes): - # bpo-39492: reducer_override used to induce a spurious reference cycle - # inside the Pickler object, that could prevent all serialized objects - # from being garbage-collected without explicity invoking gc.collect. - - # test a dynamic function - def f(): - return 4669201609102990671853203821578 - - wr = weakref.ref(f) - - bio = io.BytesIO() - from ray.cloudpickle import CloudPickler, loads, dumps - p = CloudPickler(bio, protocol=5) - p.dump(f) - new_f = loads(bio.getvalue()) - assert new_f() == 4669201609102990671853203821578 - - del p - del f - - assert wr() is None - - # test a dynamic class - class ShortlivedObject: - def __del__(self): - print("Went out of scope!") - - obj = ShortlivedObject() - new_obj = weakref.ref(obj) - - dumps(obj) - del obj - assert new_obj() is None - - if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__]))