diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index a695add8a..284cbfd81 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -130,6 +130,7 @@ cdef class UniqueID(BaseID): cdef class ObjectID(BaseID): cdef CObjectID data + cdef object buffer_ref def __init__(self, id): check_id(id) @@ -153,6 +154,12 @@ cdef class ObjectID(BaseID): def task_id(self): return TaskID(self.data.TaskId().Binary()) + def set_buffer_ref(self, ref): + self.buffer_ref = ref + + def get_buffer_ref(self): + return self.buffer_ref + cdef size_t hash(self): return self.data.Hash() diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 28f3cea75..28cf90256 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -3028,11 +3028,47 @@ def test_shutdown_disconnect_global_state(): assert str(e.value).endswith("ray.init has been called.") +@pytest.mark.parametrize( + "ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True) +def test_put_pins_object(ray_start_object_store_memory): + x_id = ray.put("HI") + x_copy = ray.ObjectID(x_id.binary()) + assert ray.get(x_copy) == "HI" + + # x cannot be evicted since x_id pins it + for _ in range(10): + ray.put(np.zeros(10 * 1024 * 1024)) + assert ray.get(x_id) == "HI" + assert ray.get(x_copy) == "HI" + + # now it can be evicted since x_id pins it but x_copy does not + del x_id + for _ in range(10): + ray.put(np.zeros(10 * 1024 * 1024)) + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(x_copy) + + # weakref put + y_id = ray.put("HI", weakref=True) + for _ in range(10): + ray.put(np.zeros(10 * 1024 * 1024)) + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(y_id) + + @ray.remote + def check_no_buffer_ref(x): + assert x[0].get_buffer_ref() is None + + z_id = ray.put("HI") + assert z_id.get_buffer_ref() is not None + ray.get(check_no_buffer_ref.remote([z_id])) + + @pytest.mark.parametrize( "ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True) def test_redis_lru_with_set(ray_start_object_store_memory): x = np.zeros(8 * 10**7, dtype=np.uint8) - x_id = ray.put(x) + x_id = ray.put(x, weakref=True) # Remove the object from the object table to simulate Redis LRU eviction. removed = False diff --git a/python/ray/tests/test_memory_limits.py b/python/ray/tests/test_memory_limits.py index 59a8dba9b..bff1f7e2f 100644 --- a/python/ray/tests/test_memory_limits.py +++ b/python/ray/tests/test_memory_limits.py @@ -50,7 +50,7 @@ class TestMemoryLimits(unittest.TestCase): def testTooLargeAllocation(self): try: ray.init(num_cpus=1, driver_object_store_memory=100 * MB) - ray.put(np.zeros(50 * MB, dtype=np.uint8)) + ray.put(np.zeros(50 * MB, dtype=np.uint8), weakref=True) self.assertRaises( OBJECT_TOO_LARGE, lambda: ray.put(np.zeros(200 * MB, dtype=np.uint8))) @@ -64,7 +64,7 @@ class TestMemoryLimits(unittest.TestCase): num_cpus=1, object_store_memory=300 * MB, driver_object_store_memory=driver_quota) - z = ray.put("hi") + z = ray.put("hi", weakref=True) a = LightActor._remote(object_store_memory=a_quota) b = GreedyActor._remote(object_store_memory=b_quota) for _ in range(5): diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index 86fd3fdfd..88d2b429d 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -19,7 +19,7 @@ class TestUnreconstructableErrors(unittest.TestCase): ray.shutdown() def testDriverPutEvictedCannotReconstruct(self): - x_id = ray.put(np.zeros(1 * 1024 * 1024)) + x_id = ray.put(np.zeros(1 * 1024 * 1024), weakref=True) ray.get(x_id) for _ in range(20): ray.put(np.zeros(10 * 1024 * 1024)) diff --git a/python/ray/tune/util.py b/python/ray/tune/util.py index d33a74d58..256411ace 100644 --- a/python/ray/tune/util.py +++ b/python/ray/tune/util.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import base64 import copy import logging import threading @@ -99,27 +98,17 @@ class UtilMonitor(Thread): def pin_in_object_store(obj): - """Pin an object in the object store. + """Deprecated, use ray.put(value, weakref=False) instead.""" - It will be available as long as the pinning process is alive. The pinned - object can be retrieved by calling get_pinned_object on the identifier - returned by this call. - """ - - obj_id = ray.put(_to_pinnable(obj)) - _pinned_objects.append(ray.get(obj_id)) - return "{}{}".format(PINNED_OBJECT_PREFIX, - base64.b64encode(obj_id.binary()).decode("utf-8")) + obj_id = ray.put(obj, weakref=False) + _pinned_objects.append(obj_id) + return obj_id def get_pinned_object(pinned_id): - """Retrieve a pinned object from the object store.""" + """Deprecated.""" - from ray import ObjectID - - return _from_pinnable( - ray.get( - ObjectID(base64.b64decode(pinned_id[len(PINNED_OBJECT_PREFIX):])))) + return ray.get(pinned_id) class warn_if_slow(object): diff --git a/python/ray/worker.py b/python/ray/worker.py index 54d71305b..31a8930f7 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -2340,11 +2340,18 @@ def get(object_ids): return values -def put(value): +def put(value, weakref=False): """Store an object in the object store. + The object may not be evicted while a reference to the returned ID exists. + Note that this pinning only applies to the particular object ID returned + by put, not object IDs in general. + Args: value: The Python object to be stored. + weakref: If set, allows the object to be evicted while a reference + to the returned ID exists. You might want to set this if putting + a lot of objects that you might not need in the future. Returns: The object ID assigned to this value. @@ -2359,8 +2366,23 @@ def put(value): worker.current_task_id, worker.task_context.put_index, ) - worker.put_object(object_id, value) + try: + worker.put_object(object_id, value) + except pyarrow.plasma.PlasmaStoreFull: + logger.info( + "Put failed since the value was either too large or the " + "store was full of pinned objects. If you are putting " + "and holding references to a lot of object ids, consider " + "ray.put(value, weakref=True) to allow object data to " + "be evicted early.") + raise worker.task_context.put_index += 1 + # Pin the object buffer with the returned id. This avoids put returns + # from getting evicted out from under the id. + if not weakref and not worker.mode == LOCAL_MODE: + object_id.set_buffer_ref( + worker.plasma_client.get_buffers( + [pyarrow.plasma.ObjectID(object_id.binary())])) return object_id