Deprecate use_pickle flag (#7474)

This commit is contained in:
Edward Oakes
2020-03-09 16:03:56 -07:00
committed by GitHub
parent 0c254295b0
commit 4ab80eafb9
12 changed files with 50 additions and 250 deletions
+2 -16
View File
@@ -1,19 +1,5 @@
from __future__ import absolute_import
import os
import sys
CLOUDPICKLE_PATH = os.path.dirname(os.path.realpath(__file__))
from ray.cloudpickle.cloudpickle_fast import * # noqa: F401, F403
if os.path.exists(os.path.join(CLOUDPICKLE_PATH, "..", "pickle5_files", "pickle5")):
HAS_PICKLE5 = True
else:
HAS_PICKLE5 = False
if sys.version_info[:2] >= (3, 8) or HAS_PICKLE5:
from ray.cloudpickle.cloudpickle_fast import *
FAST_CLOUDPICKLE_USED = True
else:
from ray.cloudpickle.cloudpickle import *
FAST_CLOUDPICKLE_USED = False
__version__ = '1.2.2.dev0'
__version__ = "1.2.2.dev0"
-1
View File
@@ -78,7 +78,6 @@ class Cluster:
"object_store_memory": 150 * 1024 * 1024, # 150 MiB
}
ray_params = ray.parameter.RayParams(**node_args)
ray_params.use_pickle = ray.cloudpickle.FAST_CLOUDPICKLE_USED
ray_params.update_if_absent(**default_kwargs)
if self.head_node is None:
node = ray.node.Node(
-5
View File
@@ -251,10 +251,6 @@ class Node:
def load_code_from_local(self):
return self._ray_params.load_code_from_local
@property
def use_pickle(self):
return self._ray_params.use_pickle
@property
def object_id_seed(self):
"""Get the seed for deterministic generation of object IDs"""
@@ -569,7 +565,6 @@ class Node:
include_java=self._ray_params.include_java,
java_worker_options=self._ray_params.java_worker_options,
load_code_from_local=self._ray_params.load_code_from_local,
use_pickle=self._ray_params.use_pickle,
fate_share=self.kernel_fate_share)
assert ray_constants.PROCESS_TYPE_RAYLET not in self.all_processes
self.all_processes[ray_constants.PROCESS_TYPE_RAYLET] = [process_info]
+5 -9
View File
@@ -5,6 +5,8 @@ from packaging import version
import ray.ray_constants as ray_constants
logger = logging.getLogger(__name__)
class RayParams:
"""A class used to store the parameters used by Ray.
@@ -79,7 +81,6 @@ class RayParams:
Java worker.
java_worker_options (list): The command options for Java worker.
load_code_from_local: Whether load code from local file or from GCS.
use_pickle: Whether data objects should be serialized with cloudpickle.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
"""
@@ -120,7 +121,6 @@ class RayParams:
include_java=False,
java_worker_options=None,
load_code_from_local=False,
use_pickle=False,
_internal_config=None):
self.object_id_seed = object_id_seed
self.redis_address = redis_address
@@ -155,7 +155,6 @@ class RayParams:
self.include_java = include_java
self.java_worker_options = java_worker_options
self.load_code_from_local = load_code_from_local
self.use_pickle = use_pickle
self._internal_config = _internal_config
self._check_usage()
@@ -209,9 +208,6 @@ class RayParams:
raise DeprecationWarning(
"The redirect_output argument is deprecated.")
if self.use_pickle:
assert (version.parse(
np.__version__) >= version.parse("1.16.0")), (
"numpy >= 1.16.0 required for use_pickle=True support. "
"You can use ray.init(use_pickle=False) for older numpy "
"versions, but this may be removed in future versions.")
if version.parse(np.__version__) < version.parse("1.16.0"):
logger.warning("Using ray with numpy < 1.16.0 will result in slow "
"serialization. Upgrade numpy if using with ray.")
+1 -8
View File
@@ -275,11 +275,6 @@ def dashboard(cluster_config_file, cluster_name, port):
is_flag=True,
default=False,
help="Specify whether load code from local file or GCS serialization.")
@click.option(
"--use-pickle/--no-use-pickle",
is_flag=True,
default=ray.cloudpickle.FAST_CLOUDPICKLE_USED,
help="Use pickle for serialization.")
def start(node_ip_address, redis_address, address, redis_port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port, memory,
@@ -287,8 +282,7 @@ def start(node_ip_address, redis_address, address, redis_port,
head, include_webui, webui_host, block, plasma_directory, huge_pages,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, use_pickle,
internal_config):
java_worker_options, load_code_from_local, internal_config):
if redis_address is not None:
raise DeprecationWarning("The --redis-address argument is "
"deprecated. Please use --address instead.")
@@ -334,7 +328,6 @@ def start(node_ip_address, redis_address, address, redis_port,
webui_host=webui_host,
java_worker_options=java_worker_options,
load_code_from_local=load_code_from_local,
use_pickle=use_pickle,
_internal_config=internal_config)
if head:
# Start Ray on the head node.
+6 -94
View File
@@ -130,8 +130,6 @@ class SerializationContext:
def __init__(self, worker):
self.worker = worker
assert worker.use_pickle
self.use_pickle = worker.use_pickle
self._thread_local = threading.local()
def actor_handle_serializer(obj):
@@ -210,8 +208,6 @@ class SerializationContext:
def _register_cloudpickle_serializer(self, cls, custom_serializer,
custom_deserializer):
assert pickle.FAST_CLOUDPICKLE_USED
def _CloudPicklerReducer(obj):
return custom_deserializer, (custom_serializer(obj), )
@@ -249,10 +245,6 @@ class SerializationContext:
self._thread_local.object_ids.add(object_id)
def _deserialize_pickle5_data(self, data):
if not self.use_pickle:
raise ValueError("Receiving pickle5 serialized objects "
"while the serialization context is "
"using a custom raw backend.")
try:
in_band, buffers = unpack_pickle5_buffers(data)
if len(buffers) > 0:
@@ -366,8 +358,6 @@ class SerializationContext:
else:
metadata = ray_constants.PICKLE5_BUFFER_METADATA
assert self.worker.use_pickle
assert ray.cloudpickle.FAST_CLOUDPICKLE_USED
writer = Pickle5Writer()
# TODO(swang): Check that contained_object_ids is empty.
try:
@@ -386,10 +376,8 @@ class SerializationContext:
def register_custom_serializer(self,
cls,
use_pickle=False,
use_dict=False,
serializer=None,
deserializer=None,
serializer,
deserializer,
local=False,
job_id=None,
class_id=None):
@@ -402,15 +390,8 @@ class SerializationContext:
Args:
cls (type): The class that ray should use this custom serializer
for.
use_pickle (bool): If true, then objects of this class will be
serialized using pickle.
use_dict: If true, then objects of this class be serialized
turning their __dict__ fields into a dictionary. Must be False
if use_pickle is true.
serializer: The custom serializer to use. This should be provided
if and only if use_pickle and use_dict are False.
deserializer: The custom deserializer to use. This should be
provided if and only if use_pickle and use_dict are False.
serializer: The custom serializer to use.
deserializer: The custom deserializer to use.
local: True if the serializers should only be registered on the
current worker. This should usually be False.
job_id: ID of the job that we want to register the class for.
@@ -421,23 +402,8 @@ class SerializationContext:
cannot be efficiently serialized by Ray.
ValueError: Raised if ray could not autogenerate a class_id.
"""
assert (serializer is None) == (deserializer is None), (
"The serializer/deserializer arguments must both be provided or "
"both not be provided.")
use_custom_serializer = (serializer is not None)
assert use_custom_serializer + use_pickle + use_dict == 1, (
"Exactly one of use_pickle, use_dict, or serializer/deserializer "
"must be specified.")
if self.worker.use_pickle and serializer is None:
# In this case it should do nothing.
return
if use_dict:
# Raise an exception if cls cannot be serialized
# efficiently by Ray.
check_serializable(cls)
assert serializer is not None and deserializer is not None, (
"Must provide serializer and deserializer.")
if class_id is None:
if not local:
@@ -470,7 +436,6 @@ class SerializationContext:
assert isinstance(job_id, JobID)
def register_class_for_serialization(worker_info):
assert worker_info["worker"].use_pickle
context = worker_info["worker"].get_serialization_context(job_id)
context._register_cloudpickle_serializer(cls, serializer,
deserializer)
@@ -482,56 +447,3 @@ class SerializationContext:
# Since we are pickling objects of this class, we don't actually
# need to ship the class definition.
register_class_for_serialization({"worker": self.worker})
def check_serializable(cls):
"""Throws an exception if Ray cannot serialize this class efficiently.
Args:
cls (type): The class to be serialized.
Raises:
Exception: An exception is raised if Ray cannot serialize this class
efficiently.
"""
if is_named_tuple(cls):
# This case works.
return
if not hasattr(cls, "__new__"):
print("The class {} does not have a '__new__' attribute and is "
"probably an old-stye class. Please make it a new-style class "
"by inheriting from 'object'.")
raise RayNotDictionarySerializable("The class {} does not have a "
"'__new__' attribute and is "
"probably an old-style class. We "
"do not support this. Please make "
"it a new-style class by "
"inheriting from 'object'."
.format(cls))
try:
obj = cls.__new__(cls)
except Exception:
raise RayNotDictionarySerializable("The class {} has overridden "
"'__new__', so Ray may not be "
"able to serialize it "
"efficiently.".format(cls))
if not hasattr(obj, "__dict__"):
raise RayNotDictionarySerializable("Objects of the class {} do not "
"have a '__dict__' attribute, so "
"Ray cannot serialize it "
"efficiently.".format(cls))
if hasattr(obj, "__slots__"):
raise RayNotDictionarySerializable("The class {} uses '__slots__', so "
"Ray may not be able to serialize "
"it efficiently.".format(cls))
def is_named_tuple(cls):
"""Return True if cls is a namedtuple and False otherwise."""
b = cls.__bases__
if len(b) != 1 or b[0] != tuple:
return False
f = getattr(cls, "_fields", None)
if not isinstance(f, tuple):
return False
return all(type(n) == str for n in f)
-4
View File
@@ -1185,7 +1185,6 @@ def start_raylet(redis_address,
include_java=False,
java_worker_options=None,
load_code_from_local=False,
use_pickle=False,
fate_share=None):
"""Start a raylet, which is a combined local scheduler and object manager.
@@ -1218,7 +1217,6 @@ def start_raylet(redis_address,
include_java (bool): If True, the raylet backend can also support
Java worker.
java_worker_options (list): The command options for Java worker.
use_pickle (bool): If True, use cloudpickle for serialization.
Returns:
ProcessInfo for the process that was started.
"""
@@ -1281,8 +1279,6 @@ def start_raylet(redis_address,
if load_code_from_local:
start_worker_command += ["--load-code-from-local"]
if use_pickle:
start_worker_command += ["--use-pickle"]
command = [
RAYLET_EXECUTABLE,
+17 -49
View File
@@ -23,6 +23,17 @@ from ray.exceptions import RayTimeoutError
logger = logging.getLogger(__name__)
def is_named_tuple(cls):
"""Return True if cls is a namedtuple and False otherwise."""
b = cls.__bases__
if len(b) != 1 or b[0] != tuple:
return False
f = getattr(cls, "_fields", None)
if not isinstance(f, tuple):
return False
return all(type(n) == str for n in f)
# https://github.com/ray-project/ray/issues/6662
def test_ignore_http_proxy(shutdown_only):
ray.init(num_cpus=1)
@@ -169,7 +180,7 @@ def test_fair_queueing(shutdown_only):
assert len(ready) == 1000, len(ready)
def complex_serialization(use_pickle):
def test_complex_serialization(ray_start_regular):
def assert_equal(obj1, obj2):
module_numpy = (type(obj1).__module__ == np.__name__
or type(obj2).__module__ == np.__name__)
@@ -208,8 +219,7 @@ def complex_serialization(use_pickle):
obj1, obj2))
for i in range(len(obj1)):
assert_equal(obj1[i], obj2[i])
elif (ray.serialization.is_named_tuple(type(obj1))
or ray.serialization.is_named_tuple(type(obj2))):
elif (is_named_tuple(type(obj1)) or is_named_tuple(type(obj2))):
assert len(obj1) == len(obj2), (
"Objects {} and {} are named "
"tuples with different lengths.".format(obj1, obj2))
@@ -369,15 +379,6 @@ def complex_serialization(use_pickle):
assert ray.get(ray.put(s)).readline() == line
def test_complex_serialization(ray_start_regular):
complex_serialization(use_pickle=False)
def test_complex_serialization_with_pickle(shutdown_only):
ray.init(use_pickle=True)
complex_serialization(use_pickle=True)
def test_numpy_serialization(ray_start_regular):
array = np.zeros(314)
from ray.cloudpickle import dumps
@@ -421,8 +422,6 @@ def test_numpy_subclass_serialization_pickle(ray_start_regular):
print(self.constant)
constant = MyNumpyConstant(123)
ray.register_custom_serializer(type(constant), use_pickle=True)
repr_orig = repr(constant)
repr_ser = repr(ray.get(ray.put(constant)))
assert repr_orig == repr_ser
@@ -515,16 +514,9 @@ def test_ray_recursive_objects(ray_start_regular):
# Create a list of recursive objects.
recursive_objects = [lst, a1, a2, a3, d1]
if ray.worker.global_worker.use_pickle:
# Serialize the recursive objects.
for obj in recursive_objects:
ray.put(obj)
else:
# Check that exceptions are thrown when we serialize the recursive
# objects.
for obj in recursive_objects:
with pytest.raises(Exception):
ray.put(obj)
# Serialize the recursive objects.
for obj in recursive_objects:
ray.put(obj)
def test_reducer_override_no_reference_cycle(ray_start_regular):
@@ -647,7 +639,7 @@ def test_put_get(shutdown_only):
assert value_before == value_after
def custom_serializers():
def test_custom_serializers(ray_start_regular):
class Foo:
def __init__(self):
self.x = 3
@@ -677,30 +669,6 @@ def custom_serializers():
assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2")
def test_custom_serializers(ray_start_regular):
custom_serializers()
def test_custom_serializers_with_pickle(shutdown_only):
ray.init(use_pickle=True)
custom_serializers()
class Foo:
def __init__(self):
self.x = 4
# Test the pickle serialization backend without serializer.
# NOTE: 'use_pickle' here is different from 'use_pickle' in
# ray.init
ray.register_custom_serializer(Foo, use_pickle=True)
@ray.remote
def f():
return Foo()
assert type(ray.get(f.remote())) == Foo
def test_serialization_final_fallback(ray_start_regular):
pytest.importorskip("catboost")
# This test will only run when "catboost" is installed.
-20
View File
@@ -594,26 +594,6 @@ print("success")
ray.get(f.remote())
@pytest.mark.parametrize(
"call_ray_start", ["ray start --head --num-cpus=1 --use-pickle"],
indirect=True)
def test_use_pickle(call_ray_start):
address = call_ray_start
ray.init(address=address, use_pickle=True)
assert ray.worker.global_worker.use_pickle
x = (2, "hello")
@ray.remote
def f(x):
assert x == (2, "hello")
assert ray.worker.global_worker.use_pickle
return (3, "world")
assert ray.get(f.remote(x)) == (3, "world")
if __name__ == "__main__":
import pytest
import sys
+18 -43
View File
@@ -145,11 +145,6 @@ class Worker:
self.check_connected()
return self.node.load_code_from_local
@property
def use_pickle(self):
self.check_connected()
return self.node.use_pickle
@property
def current_job_id(self):
if hasattr(self, "core_worker"):
@@ -555,7 +550,7 @@ def init(address=None,
raylet_socket_name=None,
temp_dir=None,
load_code_from_local=False,
use_pickle=ray.cloudpickle.FAST_CLOUDPICKLE_USED,
use_pickle=True,
_internal_config=None):
"""Connect to an existing Ray cluster or start one and connect to it.
@@ -649,7 +644,7 @@ def init(address=None,
directory for the Ray process.
load_code_from_local: Whether code should be loaded from a local module
or from the GCS.
use_pickle: Whether data objects should be serialized with cloudpickle.
use_pickle: Deprecated.
_internal_config (str): JSON configuration for overriding
RayConfig defaults. For testing purposes ONLY.
@@ -661,6 +656,9 @@ def init(address=None,
arguments is passed in.
"""
if not use_pickle:
raise DeprecationWarning("The use_pickle argument is deprecated.")
if redis_address is not None:
raise DeprecationWarning("The redis_address argument is deprecated. "
"Please use address instead.")
@@ -731,7 +729,6 @@ def init(address=None,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
load_code_from_local=load_code_from_local,
use_pickle=use_pickle,
_internal_config=_internal_config,
)
# Start the Ray processes. We set shutdown_at_exit=False because we
@@ -792,8 +789,7 @@ def init(address=None,
redis_password=redis_password,
object_id_seed=object_id_seed,
temp_dir=temp_dir,
load_code_from_local=load_code_from_local,
use_pickle=use_pickle)
load_code_from_local=load_code_from_local)
_global_node = ray.node.Node(
ray_params,
head=False,
@@ -1364,12 +1360,10 @@ def _changeproctitle(title, next_title):
def register_custom_serializer(cls,
serializer=None,
deserializer=None,
serializer,
deserializer,
use_pickle=False,
use_dict=False,
local=None,
job_id=None,
class_id=None):
"""Registers custom functions for efficient object serialization.
@@ -1377,15 +1371,6 @@ def register_custom_serializer(cls,
`cls` across processes and nodes. This can be significantly faster than
the Ray default fallbacks. Wraps `register_custom_serializer` underneath.
`use_pickle` tells Ray to automatically use cloudpickle for serialization,
and `use_dict` automatically uses `cls.__dict__`.
When calling this function, you can only provide one of the following:
1. serializer and deserializer
2. `use_pickle`
3. `use_dict`
Args:
cls (type): The class that ray should use this custom serializer for.
serializer: The custom serializer that takes in a cls instance and
@@ -1394,34 +1379,24 @@ def register_custom_serializer(cls,
deserializer: The custom deserializer that takes in a serialized
representation of the cls and outputs a cls instance. use_pickle
and use_dict must be False if provided.
use_pickle (bool): If true, objects of this class will be
serialized using pickle. Must be False if
use_dict is true.
use_dict (bool): If true, objects of this class be serialized turning
their __dict__ fields into a dictionary. Must be False if
use_pickle is true.
local: Deprecated.
job_id: Deprecated.
use_pickle: Deprecated.
use_dict: Deprecated.
class_id (str): Unique ID of the class. Autogenerated if None.
"""
if job_id:
if use_pickle:
raise DeprecationWarning(
"`job_id` is no longer a valid parameter and will be removed in "
"future versions of Ray. If this breaks your application, "
"`use_pickle` is no longer a valid parameter and will be removed "
"in future versions of Ray. If this breaks your application, "
"see `SerializationContext.register_custom_serializer`.")
if local:
if use_dict:
raise DeprecationWarning(
"`local` is no longer a valid parameter and will be removed in "
"future versions of Ray. If this breaks your application, "
"`use_pickle` is no longer a valid parameter and will be removed "
"in future versions of Ray. If this breaks your application, "
"see `SerializationContext.register_custom_serializer`.")
assert serializer is not None and deserializer is not None
context = global_worker.get_serialization_context()
context.register_custom_serializer(
cls,
use_pickle=use_pickle,
use_dict=use_dict,
serializer=serializer,
deserializer=deserializer,
class_id=class_id)
cls, serializer, deserializer, class_id=class_id)
def show_in_webui(message, key="", dtype="text"):
-1
View File
@@ -98,7 +98,6 @@ if __name__ == "__main__":
raylet_socket_name=args.raylet_name,
temp_dir=args.temp_dir,
load_code_from_local=args.load_code_from_local,
use_pickle=args.use_pickle,
_internal_config=json.dumps(internal_config),
)