From ebb431a95a6800f79dc7308589ddab90a006624c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 8 Sep 2019 15:05:00 -0700 Subject: [PATCH] Add internal_api.pin_object_data() for pinning arbitrary object ids (#5637) * add intenral pin method * add pin * update --- python/ray/internal/internal_api.py | 44 ++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 9a5a98187..ff6dd7fc5 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -4,8 +4,44 @@ from __future__ import print_function import ray.worker from ray import profiling +import pyarrow -__all__ = ["free"] +__all__ = ["free", "pin_object_data"] + + +def pin_object_data(obj_id): + """Pin the object data referenced by this object id in memory. + + The object data cannot be evicted while there exists a Python reference to + the object id passed to this function. In order to pin the object, we will + also download the object to the current node (this overhead is unavoidable + for now without a distributed ref counting solution). + + Examples: + >>> x_id = f.remote() + >>> x_id = pin_object_id(x_id) # x pinned, cannot be evicted + >>> del x_id # x can be evicted again + + Note that ray will automatically do this for objects created with + ray.put() already, unless you ray.put with weakref=True. + """ + + ray.get(obj_id) + obj_id.set_buffer_ref( + ray.worker.global_worker.plasma_client.get_buffers( + [pyarrow.plasma.ObjectID(obj_id.binary())])) + + +def unpin_object_data(obj_id): + """Unpin an object pinned by pin_object_id. + + Examples: + >>> x_id = f.remote() + >>> pin_object_id(x_id) + >>> unpin_object_id(x_id) # as if the pin didn't happen + """ + + obj_id.set_buffer_ref(None) def free(object_ids, local_only=False, delete_creating_tasks=False): @@ -21,6 +57,11 @@ def free(object_ids, local_only=False, delete_creating_tasks=False): the some of the objects are in use, object stores will delete them later when the ref count is down to 0. + Examples: + >>> x_id = f.remote() + >>> ray.get(x_id) # wait for x to be created first + >>> free([x_id]) # unpin & delete x globally + Args: object_ids (List[ObjectID]): List of object IDs to delete. local_only (bool): Whether only deleting the list of objects in local @@ -42,6 +83,7 @@ def free(object_ids, local_only=False, delete_creating_tasks=False): if not isinstance(object_id, ray.ObjectID): raise TypeError("Attempting to call `free` on the value {}, " "which is not an ray.ObjectID.".format(object_id)) + unpin_object_data(object_id) if ray.worker._mode() == ray.worker.LOCAL_MODE: worker.local_mode_manager.free(object_ids)