diff --git a/doc/requirements-doc.txt b/doc/requirements-doc.txt index c609099fe..42a6e4dad 100644 --- a/doc/requirements-doc.txt +++ b/doc/requirements-doc.txt @@ -10,6 +10,7 @@ mock numpy opencv-python-headless pandas +pickle5 pygments psutil pyyaml diff --git a/python/ray/cloudpickle/__init__.py b/python/ray/cloudpickle/__init__.py index 4389bd24c..316e4d099 100644 --- a/python/ray/cloudpickle/__init__.py +++ b/python/ray/cloudpickle/__init__.py @@ -1,19 +1,5 @@ from __future__ import absolute_import -import os -import sys -CLOUDPICKLE_PATH = os.path.dirname(os.path.realpath(__file__)) +from ray.cloudpickle.cloudpickle_fast import * # noqa: F401, F403 -if os.path.exists(os.path.join(CLOUDPICKLE_PATH, "..", "pickle5_files", "pickle5")): - HAS_PICKLE5 = True -else: - HAS_PICKLE5 = False - -if sys.version_info[:2] >= (3, 8) or HAS_PICKLE5: - from ray.cloudpickle.cloudpickle_fast import * - FAST_CLOUDPICKLE_USED = True -else: - from ray.cloudpickle.cloudpickle import * - FAST_CLOUDPICKLE_USED = False - -__version__ = '1.2.2.dev0' +__version__ = "1.2.2.dev0" diff --git a/python/ray/cluster_utils.py b/python/ray/cluster_utils.py index d31891e39..0fa997c1f 100644 --- a/python/ray/cluster_utils.py +++ b/python/ray/cluster_utils.py @@ -78,7 +78,6 @@ class Cluster: "object_store_memory": 150 * 1024 * 1024, # 150 MiB } ray_params = ray.parameter.RayParams(**node_args) - ray_params.use_pickle = ray.cloudpickle.FAST_CLOUDPICKLE_USED ray_params.update_if_absent(**default_kwargs) if self.head_node is None: node = ray.node.Node( diff --git a/python/ray/node.py b/python/ray/node.py index 9758b5894..28030d8fa 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -251,10 +251,6 @@ class Node: def load_code_from_local(self): return self._ray_params.load_code_from_local - @property - def use_pickle(self): - return self._ray_params.use_pickle - @property def object_id_seed(self): """Get the seed for deterministic generation of object IDs""" @@ -569,7 +565,6 @@ class Node: include_java=self._ray_params.include_java, java_worker_options=self._ray_params.java_worker_options, load_code_from_local=self._ray_params.load_code_from_local, - use_pickle=self._ray_params.use_pickle, fate_share=self.kernel_fate_share) assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info] diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 4db7a6f10..873333555 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -5,6 +5,8 @@ from packaging import version import ray.ray_constants as ray_constants +logger = logging.getLogger(__name__) + class RayParams: """A class used to store the parameters used by Ray. @@ -79,7 +81,6 @@ class RayParams: Java worker. java_worker_options (list): The command options for Java worker. load_code_from_local: Whether load code from local file or from GCS. - use_pickle: Whether data objects should be serialized with cloudpickle. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. """ @@ -120,7 +121,6 @@ class RayParams: include_java=False, java_worker_options=None, load_code_from_local=False, - use_pickle=False, _internal_config=None): self.object_id_seed = object_id_seed self.redis_address = redis_address @@ -155,7 +155,6 @@ class RayParams: self.include_java = include_java self.java_worker_options = java_worker_options self.load_code_from_local = load_code_from_local - self.use_pickle = use_pickle self._internal_config = _internal_config self._check_usage() @@ -209,9 +208,6 @@ class RayParams: raise DeprecationWarning( "The redirect_output argument is deprecated.") - if self.use_pickle: - assert (version.parse( - np.__version__) >= version.parse("1.16.0")), ( - "numpy >= 1.16.0 required for use_pickle=True support. " - "You can use ray.init(use_pickle=False) for older numpy " - "versions, but this may be removed in future versions.") + if version.parse(np.__version__) < version.parse("1.16.0"): + logger.warning("Using ray with numpy < 1.16.0 will result in slow " + "serialization. Upgrade numpy if using with ray.") diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 51d22e964..09cfddd7e 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -275,11 +275,6 @@ def dashboard(cluster_config_file, cluster_name, port): is_flag=True, default=False, help="Specify whether load code from local file or GCS serialization.") -@click.option( - "--use-pickle/--no-use-pickle", - is_flag=True, - default=ray.cloudpickle.FAST_CLOUDPICKLE_USED, - help="Use pickle for serialization.") def start(node_ip_address, redis_address, address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, memory, @@ -287,8 +282,7 @@ def start(node_ip_address, redis_address, address, redis_port, head, include_webui, webui_host, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, include_java, - java_worker_options, load_code_from_local, use_pickle, - internal_config): + java_worker_options, load_code_from_local, internal_config): if redis_address is not None: raise DeprecationWarning("The --redis-address argument is " "deprecated. Please use --address instead.") @@ -334,7 +328,6 @@ def start(node_ip_address, redis_address, address, redis_port, webui_host=webui_host, java_worker_options=java_worker_options, load_code_from_local=load_code_from_local, - use_pickle=use_pickle, _internal_config=internal_config) if head: # Start Ray on the head node. diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 2d5cd561d..ee49856b0 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -130,8 +130,6 @@ class SerializationContext: def __init__(self, worker): self.worker = worker - assert worker.use_pickle - self.use_pickle = worker.use_pickle self._thread_local = threading.local() def actor_handle_serializer(obj): @@ -210,8 +208,6 @@ class SerializationContext: def _register_cloudpickle_serializer(self, cls, custom_serializer, custom_deserializer): - assert pickle.FAST_CLOUDPICKLE_USED - def _CloudPicklerReducer(obj): return custom_deserializer, (custom_serializer(obj), ) @@ -249,10 +245,6 @@ class SerializationContext: self._thread_local.object_ids.add(object_id) def _deserialize_pickle5_data(self, data): - if not self.use_pickle: - raise ValueError("Receiving pickle5 serialized objects " - "while the serialization context is " - "using a custom raw backend.") try: in_band, buffers = unpack_pickle5_buffers(data) if len(buffers) > 0: @@ -366,8 +358,6 @@ class SerializationContext: else: metadata = ray_constants.PICKLE5_BUFFER_METADATA - assert self.worker.use_pickle - assert ray.cloudpickle.FAST_CLOUDPICKLE_USED writer = Pickle5Writer() # TODO(swang): Check that contained_object_ids is empty. try: @@ -386,10 +376,8 @@ class SerializationContext: def register_custom_serializer(self, cls, - use_pickle=False, - use_dict=False, - serializer=None, - deserializer=None, + serializer, + deserializer, local=False, job_id=None, class_id=None): @@ -402,15 +390,8 @@ class SerializationContext: Args: cls (type): The class that ray should use this custom serializer for. - use_pickle (bool): If true, then objects of this class will be - serialized using pickle. - use_dict: If true, then objects of this class be serialized - turning their __dict__ fields into a dictionary. Must be False - if use_pickle is true. - serializer: The custom serializer to use. This should be provided - if and only if use_pickle and use_dict are False. - deserializer: The custom deserializer to use. This should be - provided if and only if use_pickle and use_dict are False. + serializer: The custom serializer to use. + deserializer: The custom deserializer to use. local: True if the serializers should only be registered on the current worker. This should usually be False. job_id: ID of the job that we want to register the class for. @@ -421,23 +402,8 @@ class SerializationContext: cannot be efficiently serialized by Ray. ValueError: Raised if ray could not autogenerate a class_id. """ - assert (serializer is None) == (deserializer is None), ( - "The serializer/deserializer arguments must both be provided or " - "both not be provided.") - use_custom_serializer = (serializer is not None) - - assert use_custom_serializer + use_pickle + use_dict == 1, ( - "Exactly one of use_pickle, use_dict, or serializer/deserializer " - "must be specified.") - - if self.worker.use_pickle and serializer is None: - # In this case it should do nothing. - return - - if use_dict: - # Raise an exception if cls cannot be serialized - # efficiently by Ray. - check_serializable(cls) + assert serializer is not None and deserializer is not None, ( + "Must provide serializer and deserializer.") if class_id is None: if not local: @@ -470,7 +436,6 @@ class SerializationContext: assert isinstance(job_id, JobID) def register_class_for_serialization(worker_info): - assert worker_info["worker"].use_pickle context = worker_info["worker"].get_serialization_context(job_id) context._register_cloudpickle_serializer(cls, serializer, deserializer) @@ -482,56 +447,3 @@ class SerializationContext: # Since we are pickling objects of this class, we don't actually # need to ship the class definition. register_class_for_serialization({"worker": self.worker}) - - -def check_serializable(cls): - """Throws an exception if Ray cannot serialize this class efficiently. - - Args: - cls (type): The class to be serialized. - - Raises: - Exception: An exception is raised if Ray cannot serialize this class - efficiently. - """ - if is_named_tuple(cls): - # This case works. - return - if not hasattr(cls, "__new__"): - print("The class {} does not have a '__new__' attribute and is " - "probably an old-stye class. Please make it a new-style class " - "by inheriting from 'object'.") - raise RayNotDictionarySerializable("The class {} does not have a " - "'__new__' attribute and is " - "probably an old-style class. We " - "do not support this. Please make " - "it a new-style class by " - "inheriting from 'object'." - .format(cls)) - try: - obj = cls.__new__(cls) - except Exception: - raise RayNotDictionarySerializable("The class {} has overridden " - "'__new__', so Ray may not be " - "able to serialize it " - "efficiently.".format(cls)) - if not hasattr(obj, "__dict__"): - raise RayNotDictionarySerializable("Objects of the class {} do not " - "have a '__dict__' attribute, so " - "Ray cannot serialize it " - "efficiently.".format(cls)) - if hasattr(obj, "__slots__"): - raise RayNotDictionarySerializable("The class {} uses '__slots__', so " - "Ray may not be able to serialize " - "it efficiently.".format(cls)) - - -def is_named_tuple(cls): - """Return True if cls is a namedtuple and False otherwise.""" - b = cls.__bases__ - if len(b) != 1 or b[0] != tuple: - return False - f = getattr(cls, "_fields", None) - if not isinstance(f, tuple): - return False - return all(type(n) == str for n in f) diff --git a/python/ray/services.py b/python/ray/services.py index 8853c8e59..ec8df1cda 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1185,7 +1185,6 @@ def start_raylet(redis_address, include_java=False, java_worker_options=None, load_code_from_local=False, - use_pickle=False, fate_share=None): """Start a raylet, which is a combined local scheduler and object manager. @@ -1218,7 +1217,6 @@ def start_raylet(redis_address, include_java (bool): If True, the raylet backend can also support Java worker. java_worker_options (list): The command options for Java worker. - use_pickle (bool): If True, use cloudpickle for serialization. Returns: ProcessInfo for the process that was started. """ @@ -1281,8 +1279,6 @@ def start_raylet(redis_address, if load_code_from_local: start_worker_command += ["--load-code-from-local"] - if use_pickle: - start_worker_command += ["--use-pickle"] command = [ RAYLET_EXECUTABLE, diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c6ecd4bbf..26c2f515a 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -23,6 +23,17 @@ from ray.exceptions import RayTimeoutError logger = logging.getLogger(__name__) +def is_named_tuple(cls): + """Return True if cls is a namedtuple and False otherwise.""" + b = cls.__bases__ + if len(b) != 1 or b[0] != tuple: + return False + f = getattr(cls, "_fields", None) + if not isinstance(f, tuple): + return False + return all(type(n) == str for n in f) + + # https://github.com/ray-project/ray/issues/6662 def test_ignore_http_proxy(shutdown_only): ray.init(num_cpus=1) @@ -169,7 +180,7 @@ def test_fair_queueing(shutdown_only): assert len(ready) == 1000, len(ready) -def complex_serialization(use_pickle): +def test_complex_serialization(ray_start_regular): def assert_equal(obj1, obj2): module_numpy = (type(obj1).__module__ == np.__name__ or type(obj2).__module__ == np.__name__) @@ -208,8 +219,7 @@ def complex_serialization(use_pickle): obj1, obj2)) for i in range(len(obj1)): assert_equal(obj1[i], obj2[i]) - elif (ray.serialization.is_named_tuple(type(obj1)) - or ray.serialization.is_named_tuple(type(obj2))): + elif (is_named_tuple(type(obj1)) or is_named_tuple(type(obj2))): assert len(obj1) == len(obj2), ( "Objects {} and {} are named " "tuples with different lengths.".format(obj1, obj2)) @@ -369,15 +379,6 @@ def complex_serialization(use_pickle): assert ray.get(ray.put(s)).readline() == line -def test_complex_serialization(ray_start_regular): - complex_serialization(use_pickle=False) - - -def test_complex_serialization_with_pickle(shutdown_only): - ray.init(use_pickle=True) - complex_serialization(use_pickle=True) - - def test_numpy_serialization(ray_start_regular): array = np.zeros(314) from ray.cloudpickle import dumps @@ -421,8 +422,6 @@ def test_numpy_subclass_serialization_pickle(ray_start_regular): print(self.constant) constant = MyNumpyConstant(123) - ray.register_custom_serializer(type(constant), use_pickle=True) - repr_orig = repr(constant) repr_ser = repr(ray.get(ray.put(constant))) assert repr_orig == repr_ser @@ -515,16 +514,9 @@ def test_ray_recursive_objects(ray_start_regular): # Create a list of recursive objects. recursive_objects = [lst, a1, a2, a3, d1] - if ray.worker.global_worker.use_pickle: - # Serialize the recursive objects. - for obj in recursive_objects: - ray.put(obj) - else: - # Check that exceptions are thrown when we serialize the recursive - # objects. - for obj in recursive_objects: - with pytest.raises(Exception): - ray.put(obj) + # Serialize the recursive objects. + for obj in recursive_objects: + ray.put(obj) def test_reducer_override_no_reference_cycle(ray_start_regular): @@ -647,7 +639,7 @@ def test_put_get(shutdown_only): assert value_before == value_after -def custom_serializers(): +def test_custom_serializers(ray_start_regular): class Foo: def __init__(self): self.x = 3 @@ -677,30 +669,6 @@ def custom_serializers(): assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2") -def test_custom_serializers(ray_start_regular): - custom_serializers() - - -def test_custom_serializers_with_pickle(shutdown_only): - ray.init(use_pickle=True) - custom_serializers() - - class Foo: - def __init__(self): - self.x = 4 - - # Test the pickle serialization backend without serializer. - # NOTE: 'use_pickle' here is different from 'use_pickle' in - # ray.init - ray.register_custom_serializer(Foo, use_pickle=True) - - @ray.remote - def f(): - return Foo() - - assert type(ray.get(f.remote())) == Foo - - def test_serialization_final_fallback(ray_start_regular): pytest.importorskip("catboost") # This test will only run when "catboost" is installed. diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index b1b11304b..22ce94c02 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -594,26 +594,6 @@ print("success") ray.get(f.remote()) -@pytest.mark.parametrize( - "call_ray_start", ["ray start --head --num-cpus=1 --use-pickle"], - indirect=True) -def test_use_pickle(call_ray_start): - address = call_ray_start - - ray.init(address=address, use_pickle=True) - - assert ray.worker.global_worker.use_pickle - x = (2, "hello") - - @ray.remote - def f(x): - assert x == (2, "hello") - assert ray.worker.global_worker.use_pickle - return (3, "world") - - assert ray.get(f.remote(x)) == (3, "world") - - if __name__ == "__main__": import pytest import sys diff --git a/python/ray/worker.py b/python/ray/worker.py index 568d4a967..cc345c037 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -145,11 +145,6 @@ class Worker: self.check_connected() return self.node.load_code_from_local - @property - def use_pickle(self): - self.check_connected() - return self.node.use_pickle - @property def current_job_id(self): if hasattr(self, "core_worker"): @@ -555,7 +550,7 @@ def init(address=None, raylet_socket_name=None, temp_dir=None, load_code_from_local=False, - use_pickle=ray.cloudpickle.FAST_CLOUDPICKLE_USED, + use_pickle=True, _internal_config=None): """Connect to an existing Ray cluster or start one and connect to it. @@ -649,7 +644,7 @@ def init(address=None, directory for the Ray process. load_code_from_local: Whether code should be loaded from a local module or from the GCS. - use_pickle: Whether data objects should be serialized with cloudpickle. + use_pickle: Deprecated. _internal_config (str): JSON configuration for overriding RayConfig defaults. For testing purposes ONLY. @@ -661,6 +656,9 @@ def init(address=None, arguments is passed in. """ + if not use_pickle: + raise DeprecationWarning("The use_pickle argument is deprecated.") + if redis_address is not None: raise DeprecationWarning("The redis_address argument is deprecated. " "Please use address instead.") @@ -731,7 +729,6 @@ def init(address=None, raylet_socket_name=raylet_socket_name, temp_dir=temp_dir, load_code_from_local=load_code_from_local, - use_pickle=use_pickle, _internal_config=_internal_config, ) # Start the Ray processes. We set shutdown_at_exit=False because we @@ -792,8 +789,7 @@ def init(address=None, redis_password=redis_password, object_id_seed=object_id_seed, temp_dir=temp_dir, - load_code_from_local=load_code_from_local, - use_pickle=use_pickle) + load_code_from_local=load_code_from_local) _global_node = ray.node.Node( ray_params, head=False, @@ -1364,12 +1360,10 @@ def _changeproctitle(title, next_title): def register_custom_serializer(cls, - serializer=None, - deserializer=None, + serializer, + deserializer, use_pickle=False, use_dict=False, - local=None, - job_id=None, class_id=None): """Registers custom functions for efficient object serialization. @@ -1377,15 +1371,6 @@ def register_custom_serializer(cls, `cls` across processes and nodes. This can be significantly faster than the Ray default fallbacks. Wraps `register_custom_serializer` underneath. - `use_pickle` tells Ray to automatically use cloudpickle for serialization, - and `use_dict` automatically uses `cls.__dict__`. - - When calling this function, you can only provide one of the following: - - 1. serializer and deserializer - 2. `use_pickle` - 3. `use_dict` - Args: cls (type): The class that ray should use this custom serializer for. serializer: The custom serializer that takes in a cls instance and @@ -1394,34 +1379,24 @@ def register_custom_serializer(cls, deserializer: The custom deserializer that takes in a serialized representation of the cls and outputs a cls instance. use_pickle and use_dict must be False if provided. - use_pickle (bool): If true, objects of this class will be - serialized using pickle. Must be False if - use_dict is true. - use_dict (bool): If true, objects of this class be serialized turning - their __dict__ fields into a dictionary. Must be False if - use_pickle is true. - local: Deprecated. - job_id: Deprecated. + use_pickle: Deprecated. + use_dict: Deprecated. class_id (str): Unique ID of the class. Autogenerated if None. """ - if job_id: + if use_pickle: raise DeprecationWarning( - "`job_id` is no longer a valid parameter and will be removed in " - "future versions of Ray. If this breaks your application, " + "`use_pickle` is no longer a valid parameter and will be removed " + "in future versions of Ray. If this breaks your application, " "see `SerializationContext.register_custom_serializer`.") - if local: + if use_dict: raise DeprecationWarning( - "`local` is no longer a valid parameter and will be removed in " - "future versions of Ray. If this breaks your application, " + "`use_pickle` is no longer a valid parameter and will be removed " + "in future versions of Ray. If this breaks your application, " "see `SerializationContext.register_custom_serializer`.") + assert serializer is not None and deserializer is not None context = global_worker.get_serialization_context() context.register_custom_serializer( - cls, - use_pickle=use_pickle, - use_dict=use_dict, - serializer=serializer, - deserializer=deserializer, - class_id=class_id) + cls, serializer, deserializer, class_id=class_id) def show_in_webui(message, key="", dtype="text"): diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index b4719285e..5f0285e27 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -98,7 +98,6 @@ if __name__ == "__main__": raylet_socket_name=args.raylet_name, temp_dir=args.temp_dir, load_code_from_local=args.load_code_from_local, - use_pickle=args.use_pickle, _internal_config=json.dumps(internal_config), )