mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 23:11:40 +08:00
Remove cloudpickle customization and just use plain cloudpickle. (#588)
* Remove augmentations of cloudpickle. * Entirely remove cloudpickle modifications. Just use plain cloudpickle.
This commit is contained in:
committed by
Philipp Moritz
parent
679910496e
commit
997aa35721
+3
-3
@@ -2,6 +2,7 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import cloudpickle as pickle
|
||||
import hashlib
|
||||
import inspect
|
||||
import json
|
||||
@@ -10,7 +11,6 @@ import redis
|
||||
import traceback
|
||||
|
||||
import ray.local_scheduler
|
||||
import ray.pickling as pickling
|
||||
import ray.signature as signature
|
||||
import ray.worker
|
||||
from ray.utils import random_string, binary_to_hex, hex_to_binary
|
||||
@@ -72,7 +72,7 @@ def fetch_and_register_actor(actor_class_key, worker):
|
||||
temporary_actor_method)
|
||||
|
||||
try:
|
||||
unpickled_class = pickling.loads(pickled_class)
|
||||
unpickled_class = pickle.loads(pickled_class)
|
||||
except Exception:
|
||||
# If an exception was thrown when the actor was imported, we record the
|
||||
# traceback and notify the scheduler of the failure.
|
||||
@@ -207,7 +207,7 @@ def export_actor_class(class_id, Class, actor_method_names, worker):
|
||||
d = {"driver_id": worker.task_driver_id.id(),
|
||||
"class_name": Class.__name__,
|
||||
"module": Class.__module__,
|
||||
"class": pickling.dumps(Class),
|
||||
"class": pickle.dumps(Class),
|
||||
"actor_method_names": json.dumps(list(actor_method_names))}
|
||||
worker.redis_client.hmset(key, d)
|
||||
worker.redis_client.rpush("Exports", key)
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
# Note that a little bit of code here is taken and slightly modified from the
|
||||
# pickler because it was not possible to change its behavior otherwise.
|
||||
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from ctypes import c_void_p
|
||||
from cloudpickle import pickle, cloudpickle, CloudPickler, load, loads
|
||||
|
||||
__all__ = ["load", "loads", "dump", "dumps"]
|
||||
|
||||
try:
|
||||
from ctypes import pythonapi
|
||||
pythonapi.PyCell_Set # Make sure this exists
|
||||
except:
|
||||
pythonapi = None
|
||||
|
||||
|
||||
def dump(obj, file, protocol=2):
|
||||
return BetterPickler(file, protocol).dump(obj)
|
||||
|
||||
|
||||
def dumps(obj):
|
||||
stringio = cloudpickle.StringIO()
|
||||
dump(obj, stringio)
|
||||
return stringio.getvalue()
|
||||
|
||||
|
||||
def _make_skel_func(code, closure, base_globals=None):
|
||||
"""Create a skeleton function object.
|
||||
|
||||
Creates a skeleton function object that contains just the provided code and
|
||||
the correct number of cells in func_closure. All other func attributes
|
||||
(e.g. func_globals) are empty.
|
||||
"""
|
||||
if base_globals is None:
|
||||
base_globals = {}
|
||||
base_globals["__builtins__"] = __builtins__
|
||||
return _make_skel_func.__class__(code, base_globals, None, None,
|
||||
tuple(closure))
|
||||
|
||||
|
||||
def _fill_function(func, globals, defaults, closure, dict):
|
||||
"""Fill in the resst of the function data.
|
||||
|
||||
This fills in the rest of function data into the skeleton function object
|
||||
that were created via _make_skel_func(), including closures.
|
||||
"""
|
||||
result = cloudpickle._fill_function(func, globals, defaults, dict)
|
||||
if pythonapi is not None:
|
||||
for i, v in enumerate(closure):
|
||||
pythonapi.PyCell_Set(c_void_p(id(result.__closure__[i])),
|
||||
c_void_p(id(v)))
|
||||
return result
|
||||
|
||||
|
||||
class BetterPickler(CloudPickler):
|
||||
def save_function_tuple(self, func):
|
||||
(code, f_globals, defaults,
|
||||
closure, dct, base_globals) = self.extract_func_data(func)
|
||||
|
||||
self.save(_fill_function)
|
||||
self.write(pickle.MARK)
|
||||
|
||||
self.save(_make_skel_func if pythonapi else cloudpickle._make_skel_func)
|
||||
self.save((code,
|
||||
(map(lambda _: cloudpickle._make_cell(None), closure)
|
||||
if closure and pythonapi is not None
|
||||
else closure),
|
||||
base_globals))
|
||||
self.write(pickle.REDUCE)
|
||||
self.memoize(func)
|
||||
|
||||
self.save(f_globals)
|
||||
self.save(defaults)
|
||||
self.save(closure)
|
||||
self.save(dct)
|
||||
self.write(pickle.TUPLE)
|
||||
self.write(pickle.REDUCE)
|
||||
|
||||
def save_cell(self, obj):
|
||||
self.save(cloudpickle._make_cell)
|
||||
self.save((obj.cell_contents,))
|
||||
self.write(pickle.REDUCE)
|
||||
dispatch = CloudPickler.dispatch.copy()
|
||||
dispatch[(lambda _: lambda: _)(0).__closure__[0].__class__] = save_cell
|
||||
@@ -2,8 +2,9 @@ from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import cloudpickle as pickle
|
||||
|
||||
import ray.numbuf
|
||||
import ray.pickling as pickling
|
||||
|
||||
|
||||
class RaySerializationException(Exception):
|
||||
@@ -123,7 +124,7 @@ def serialize(obj):
|
||||
class_id = type_to_class_id[type(obj)]
|
||||
|
||||
if class_id in classes_to_pickle:
|
||||
serialized_obj = {"data": pickling.dumps(obj),
|
||||
serialized_obj = {"data": pickle.dumps(obj),
|
||||
"pickle": True}
|
||||
elif class_id in custom_serializers:
|
||||
serialized_obj = {"data": custom_serializers[class_id](obj)}
|
||||
@@ -160,7 +161,7 @@ def deserialize(serialized_obj):
|
||||
|
||||
if "pickle" in serialized_obj:
|
||||
# The object was pickled, so unpickle it.
|
||||
obj = pickling.loads(serialized_obj["data"])
|
||||
obj = pickle.loads(serialized_obj["data"])
|
||||
else:
|
||||
assert class_id not in classes_to_pickle
|
||||
if class_id not in whitelisted_classes:
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import atexit
|
||||
import cloudpickle as pickle
|
||||
import collections
|
||||
import colorama
|
||||
import copy
|
||||
@@ -20,7 +21,6 @@ import traceback
|
||||
|
||||
# Ray modules
|
||||
import ray.experimental.state as state
|
||||
import ray.pickling as pickling
|
||||
import ray.serialization as serialization
|
||||
import ray.services as services
|
||||
import ray.signature as signature
|
||||
@@ -497,7 +497,7 @@ class Worker(object):
|
||||
# Attempt to pickle the function before we need it. This could fail, and
|
||||
# it is more convenient if the failure happens before we actually run the
|
||||
# function locally.
|
||||
pickled_function = pickling.dumps(function)
|
||||
pickled_function = pickle.dumps(function)
|
||||
|
||||
function_to_run_id = random_string()
|
||||
key = "FunctionsToRun:{}".format(function_to_run_id)
|
||||
@@ -1087,7 +1087,7 @@ def fetch_and_register_remote_function(key, worker=global_worker):
|
||||
num_gpus)
|
||||
|
||||
try:
|
||||
function = pickling.loads(serialized_function)
|
||||
function = pickle.loads(serialized_function)
|
||||
except:
|
||||
# If an exception was thrown when the remote function was imported, we
|
||||
# record the traceback and notify the scheduler of the failure.
|
||||
@@ -1117,7 +1117,7 @@ def fetch_and_execute_function_to_run(key, worker=global_worker):
|
||||
counter = worker.redis_client.hincrby(worker.node_ip_address, key, 1) - 1
|
||||
try:
|
||||
# Deserialize the function.
|
||||
function = pickling.loads(serialized_function)
|
||||
function = pickle.loads(serialized_function)
|
||||
# Run the function.
|
||||
function({"counter": counter})
|
||||
except:
|
||||
@@ -1843,7 +1843,7 @@ def export_remote_function(function_id, func_name, func, func_invoker,
|
||||
# Allow the function to reference itself as a global variable
|
||||
func.__globals__[func.__name__] = func_invoker
|
||||
try:
|
||||
pickled_func = pickling.dumps(func)
|
||||
pickled_func = pickle.dumps(func)
|
||||
finally:
|
||||
# Undo our changes
|
||||
if func_name_global_valid:
|
||||
|
||||
Reference in New Issue
Block a user