Ref count objects created with ray.put (#5590)

* wip

* add weakref option

* tune util

* test ref

* doc

* fix tests
This commit is contained in:
Eric Liang
2019-08-30 16:43:23 -07:00
committed by GitHub
parent 93e103135b
commit bea43c85b1
6 changed files with 77 additions and 23 deletions
+7
View File
@@ -130,6 +130,7 @@ cdef class UniqueID(BaseID):
cdef class ObjectID(BaseID):
cdef CObjectID data
cdef object buffer_ref
def __init__(self, id):
check_id(id)
@@ -153,6 +154,12 @@ cdef class ObjectID(BaseID):
def task_id(self):
return TaskID(self.data.TaskId().Binary())
def set_buffer_ref(self, ref):
self.buffer_ref = ref
def get_buffer_ref(self):
return self.buffer_ref
cdef size_t hash(self):
return self.data.Hash()
+37 -1
View File
@@ -3028,11 +3028,47 @@ def test_shutdown_disconnect_global_state():
assert str(e.value).endswith("ray.init has been called.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_put_pins_object(ray_start_object_store_memory):
x_id = ray.put("HI")
x_copy = ray.ObjectID(x_id.binary())
assert ray.get(x_copy) == "HI"
# x cannot be evicted since x_id pins it
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
assert ray.get(x_id) == "HI"
assert ray.get(x_copy) == "HI"
# now it can be evicted since x_id pins it but x_copy does not
del x_id
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(x_copy)
# weakref put
y_id = ray.put("HI", weakref=True)
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(y_id)
@ray.remote
def check_no_buffer_ref(x):
assert x[0].get_buffer_ref() is None
z_id = ray.put("HI")
assert z_id.get_buffer_ref() is not None
ray.get(check_no_buffer_ref.remote([z_id]))
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_redis_lru_with_set(ray_start_object_store_memory):
x = np.zeros(8 * 10**7, dtype=np.uint8)
x_id = ray.put(x)
x_id = ray.put(x, weakref=True)
# Remove the object from the object table to simulate Redis LRU eviction.
removed = False
+2 -2
View File
@@ -50,7 +50,7 @@ class TestMemoryLimits(unittest.TestCase):
def testTooLargeAllocation(self):
try:
ray.init(num_cpus=1, driver_object_store_memory=100 * MB)
ray.put(np.zeros(50 * MB, dtype=np.uint8))
ray.put(np.zeros(50 * MB, dtype=np.uint8), weakref=True)
self.assertRaises(
OBJECT_TOO_LARGE,
lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8)))
@@ -64,7 +64,7 @@ class TestMemoryLimits(unittest.TestCase):
num_cpus=1,
object_store_memory=300 * MB,
driver_object_store_memory=driver_quota)
z = ray.put("hi")
z = ray.put("hi", weakref=True)
a = LightActor._remote(object_store_memory=a_quota)
b = GreedyActor._remote(object_store_memory=b_quota)
for _ in range(5):
@@ -19,7 +19,7 @@ class TestUnreconstructableErrors(unittest.TestCase):
ray.shutdown()
def testDriverPutEvictedCannotReconstruct(self):
x_id = ray.put(np.zeros(1 * 1024 * 1024))
x_id = ray.put(np.zeros(1 * 1024 * 1024), weakref=True)
ray.get(x_id)
for _ in range(20):
ray.put(np.zeros(10 * 1024 * 1024))
+6 -17
View File
@@ -2,7 +2,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import base64
import copy
import logging
import threading
@@ -99,27 +98,17 @@ class UtilMonitor(Thread):
def pin_in_object_store(obj):
"""Pin an object in the object store.
"""Deprecated, use ray.put(value, weakref=False) instead."""
It will be available as long as the pinning process is alive. The pinned
object can be retrieved by calling get_pinned_object on the identifier
returned by this call.
"""
obj_id = ray.put(_to_pinnable(obj))
_pinned_objects.append(ray.get(obj_id))
return "{}{}".format(PINNED_OBJECT_PREFIX,
base64.b64encode(obj_id.binary()).decode("utf-8"))
obj_id = ray.put(obj, weakref=False)
_pinned_objects.append(obj_id)
return obj_id
def get_pinned_object(pinned_id):
"""Retrieve a pinned object from the object store."""
"""Deprecated."""
from ray import ObjectID
return _from_pinnable(
ray.get(
ObjectID(base64.b64decode(pinned_id[len(PINNED_OBJECT_PREFIX):]))))
return ray.get(pinned_id)
class warn_if_slow(object):
+24 -2
View File
@@ -2340,11 +2340,18 @@ def get(object_ids):
return values
def put(value):
def put(value, weakref=False):
"""Store an object in the object store.
The object may not be evicted while a reference to the returned ID exists.
Note that this pinning only applies to the particular object ID returned
by put, not object IDs in general.
Args:
value: The Python object to be stored.
weakref: If set, allows the object to be evicted while a reference
to the returned ID exists. You might want to set this if putting
a lot of objects that you might not need in the future.
Returns:
The object ID assigned to this value.
@@ -2359,8 +2366,23 @@ def put(value):
worker.current_task_id,
worker.task_context.put_index,
)
worker.put_object(object_id, value)
try:
worker.put_object(object_id, value)
except pyarrow.plasma.PlasmaStoreFull:
logger.info(
"Put failed since the value was either too large or the "
"store was full of pinned objects. If you are putting "
"and holding references to a lot of object ids, consider "
"ray.put(value, weakref=True) to allow object data to "
"be evicted early.")
raise
worker.task_context.put_index += 1
# Pin the object buffer with the returned id. This avoids put returns
# from getting evicted out from under the id.
if not weakref and not worker.mode == LOCAL_MODE:
object_id.set_buffer_ref(
worker.plasma_client.get_buffers(
[pyarrow.plasma.ObjectID(object_id.binary())]))
return object_id