Add internal_api.pin_object_data() for pinning arbitrary object ids (#5637)

* add intenral pin method

* add pin

* update
This commit is contained in:
Eric Liang
2019-09-08 15:05:00 -07:00
committed by Stephanie Wang
parent d8f5804690
commit ebb431a95a
+43 -1
View File
@@ -4,8 +4,44 @@ from __future__ import print_function
import ray.worker
from ray import profiling
import pyarrow
__all__ = ["free"]
__all__ = ["free", "pin_object_data"]
def pin_object_data(obj_id):
"""Pin the object data referenced by this object id in memory.
The object data cannot be evicted while there exists a Python reference to
the object id passed to this function. In order to pin the object, we will
also download the object to the current node (this overhead is unavoidable
for now without a distributed ref counting solution).
Examples:
>>> x_id = f.remote()
>>> x_id = pin_object_id(x_id) # x pinned, cannot be evicted
>>> del x_id # x can be evicted again
Note that ray will automatically do this for objects created with
ray.put() already, unless you ray.put with weakref=True.
"""
ray.get(obj_id)
obj_id.set_buffer_ref(
ray.worker.global_worker.plasma_client.get_buffers(
[pyarrow.plasma.ObjectID(obj_id.binary())]))
def unpin_object_data(obj_id):
"""Unpin an object pinned by pin_object_id.
Examples:
>>> x_id = f.remote()
>>> pin_object_id(x_id)
>>> unpin_object_id(x_id) # as if the pin didn't happen
"""
obj_id.set_buffer_ref(None)
def free(object_ids, local_only=False, delete_creating_tasks=False):
@@ -21,6 +57,11 @@ def free(object_ids, local_only=False, delete_creating_tasks=False):
the some of the objects are in use, object stores will delete them later
when the ref count is down to 0.
Examples:
>>> x_id = f.remote()
>>> ray.get(x_id) # wait for x to be created first
>>> free([x_id]) # unpin & delete x globally
Args:
object_ids (List[ObjectID]): List of object IDs to delete.
local_only (bool): Whether only deleting the list of objects in local
@@ -42,6 +83,7 @@ def free(object_ids, local_only=False, delete_creating_tasks=False):
if not isinstance(object_id, ray.ObjectID):
raise TypeError("Attempting to call `free` on the value {}, "
"which is not an ray.ObjectID.".format(object_id))
unpin_object_data(object_id)
if ray.worker._mode() == ray.worker.LOCAL_MODE:
worker.local_mode_manager.free(object_ids)