diff --git a/python/ray/actor.py b/python/ray/actor.py index a28dcb969..b7499a95f 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -13,7 +13,12 @@ import ray.local_scheduler import ray.ray_constants as ray_constants import ray.signature as signature import ray.worker -from ray.utils import _random_string, is_cython, push_error_to_driver +from ray.utils import ( + _random_string, + check_oversized_pickle, + is_cython, + push_error_to_driver, +) DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1 @@ -393,19 +398,8 @@ def export_actor_class(class_id, Class, actor_method_names, "actor_method_names": json.dumps(list(actor_method_names)) } - if (len(actor_class_info["class"]) > - ray_constants.PICKLE_OBJECT_WARNING_SIZE): - warning_message = ("Warning: The actor {} has size {} when pickled. " - "It will be stored in Redis, which could cause " - "memory issues. This may mean that the actor " - "definition uses a large array or other object." - .format(actor_class_info["class_name"], - len(actor_class_info["class"]))) - ray.utils.push_error_to_driver( - worker, - ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, - warning_message, - driver_id=worker.task_driver_id.id()) + check_oversized_pickle(actor_class_info["class"], + actor_class_info["class_name"], "actor", worker) if worker.mode is None: # This means that 'ray.init()' has not been called yet and so we must diff --git a/python/ray/utils.py b/python/ray/utils.py index 5f51b6f24..db19f69bb 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -268,3 +268,28 @@ def merge_dicts(d1, d2): d = d1.copy() d.update(d2) return d + + +def check_oversized_pickle(pickled, name, obj_type, worker): + """Send a warning message if the pickled object is too large. + + Args: + pickled: the pickled object. + name: name of the pickled object. + obj_type: type of the pickled object, can be 'function', + 'remote function', 'actor', or 'object'. + worker: the worker used to send warning message. + """ + length = len(pickled) + if length <= ray_constants.PICKLE_OBJECT_WARNING_SIZE: + return + warning_message = ( + "Warning: The {} {} has size {} when pickled. " + "It will be stored in Redis, which could cause memory issues. " + "This may mean that its definition uses a large array or other object." + ).format(obj_type, name, length) + push_error_to_driver( + worker, + ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, + warning_message, + driver_id=worker.task_driver_id.id()) diff --git a/python/ray/worker.py b/python/ray/worker.py index 266d3e48e..a0f72aa9d 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -30,7 +30,12 @@ import ray.signature import ray.local_scheduler import ray.plasma import ray.ray_constants as ray_constants -from ray.utils import random_string, binary_to_hex, is_cython +from ray.utils import ( + binary_to_hex, + check_oversized_pickle, + is_cython, + random_string, +) SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -653,18 +658,8 @@ class Worker(object): else: del function.__globals__[function.__name__] - if len(pickled_function) > ray_constants.PICKLE_OBJECT_WARNING_SIZE: - warning_message = ("Warning: The remote function {} has size {} " - "when pickled. It will be stored in Redis, " - "which could cause memory issues. This may " - "mean that the function definition uses a " - "large array or other object.".format( - function_name, len(pickled_function))) - ray.utils.push_error_to_driver( - self, - ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, - warning_message, - driver_id=self.task_driver_id.id()) + check_oversized_pickle(pickled_function, function_name, + "remote function", self) self.redis_client.hmset( key, { @@ -714,20 +709,8 @@ class Worker(object): # we don't need to export it again. return - if (len(pickled_function) > - ray_constants.PICKLE_OBJECT_WARNING_SIZE): - warning_message = ("Warning: The function {} has size {} when " - "pickled. It will be stored in Redis, " - "which could cause memory issues. This may " - "mean that the remote function definition " - "uses a large array or other object." - .format(function.__name__, - len(pickled_function))) - ray.utils.push_error_to_driver( - self, - ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, - warning_message, - driver_id=self.task_driver_id.id()) + check_oversized_pickle(pickled_function, function.__name__, + "function", self) # Run the function on all workers. self.redis_client.hmset(