Basic reference counting & pinning (#6554)

This commit is contained in:
Edward Oakes
2020-01-06 17:30:26 -06:00
committed by GitHub
parent c9855c9769
commit 2a4d2c6e9e
34 changed files with 556 additions and 317 deletions
-1
View File
@@ -38,7 +38,6 @@ cdef class BaseID:
cdef class ObjectID(BaseID):
cdef:
CObjectID data
object buffer_ref
# Flag indicating whether or not this object ID was added to the set
# of active IDs in the core worker so we know whether we should clean
# it up.
+14 -4
View File
@@ -844,7 +844,8 @@ cdef class CoreWorker:
if object_id is None:
with nogil:
check_status(self.core_worker.get().Create(
metadata, data_size, c_object_id, data))
metadata, data_size,
c_object_id, data))
else:
c_object_id[0] = object_id.native()
with nogil:
@@ -869,20 +870,29 @@ cdef class CoreWorker:
return data.get() == NULL
def put_serialized_object(self, serialized_object,
ObjectID object_id=None):
ObjectID object_id=None,
c_bool pin_object=True):
cdef:
CObjectID c_object_id
shared_ptr[CBuffer] data
shared_ptr[CBuffer] metadata
# The object won't be pinned if an ObjectID is provided by the
# user (because we can't track its lifetime to unpin). Note that
# the API to do this isn't supported as a public API.
c_bool owns_object = object_id is None
metadata = string_to_buffer(serialized_object.metadata)
total_bytes = serialized_object.total_bytes
object_already_exists = self._create_put_buffer(
metadata, total_bytes, object_id, &c_object_id, &data)
metadata, total_bytes, object_id,
&c_object_id, &data)
if not object_already_exists:
write_serialized_object(serialized_object, data)
with nogil:
check_status(
self.core_worker.get().Seal(c_object_id))
self.core_worker.get().Seal(
c_object_id, owns_object, pin_object))
return ObjectID(c_object_id.Binary())
def wait(self, object_ids, int num_returns, int64_t timeout_ms,
+4 -3
View File
@@ -133,12 +133,13 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus Put(const CRayObject &object, CObjectID *object_id)
CRayStatus Put(const CRayObject &object, const CObjectID &object_id)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, CObjectID *object_id,
shared_ptr[CBuffer] *data)
const size_t data_size,
CObjectID *object_id, shared_ptr[CBuffer] *data)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size, const CObjectID &object_id,
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id)
CRayStatus Seal(const CObjectID &object_id, c_bool owns_object,
c_bool pin_object)
CRayStatus Get(const c_vector[CObjectID] &ids, int64_t timeout_ms,
c_vector[shared_ptr[CRayObject]] *results)
CRayStatus Contains(const CObjectID &object_id, c_bool *has_object)
-6
View File
@@ -176,12 +176,6 @@ cdef class ObjectID(BaseID):
def task_id(self):
return TaskID(self.data.TaskId().Binary())
def set_buffer_ref(self, ref):
self.buffer_ref = ref
def get_buffer_ref(self):
return self.buffer_ref
cdef size_t hash(self):
return self.data.Hash()
+1 -36
View File
@@ -5,41 +5,7 @@ from __future__ import print_function
import ray.worker
from ray import profiling
__all__ = ["free", "pin_object_data"]
def pin_object_data(object_id):
"""Pin the object data referenced by this object id in memory.
The object data cannot be evicted while there exists a Python reference to
the object id passed to this function. In order to pin the object, we will
also download the object to the current node (this overhead is unavoidable
for now without a distributed ref counting solution).
Examples:
>>> x_id = f.remote()
>>> x_id = pin_object_id(x_id) # x pinned, cannot be evicted
>>> del x_id # x can be evicted again
Note that ray will automatically do this for objects created with
ray.put() already, unless you ray.put with weakref=True.
"""
worker = ray.worker.get_global_worker()
object_id.set_buffer_ref(
worker.core_worker.get_objects([object_id], worker.current_task_id))
def unpin_object_data(object_id):
"""Unpin an object pinned by pin_object_id.
Examples:
>>> x_id = f.remote()
>>> pin_object_id(x_id)
>>> unpin_object_id(x_id) # as if the pin didn't happen
"""
object_id.set_buffer_ref(None)
__all__ = ["free"]
def free(object_ids, local_only=False, delete_creating_tasks=False):
@@ -81,7 +47,6 @@ def free(object_ids, local_only=False, delete_creating_tasks=False):
if not isinstance(object_id, ray.ObjectID):
raise TypeError("Attempting to call `free` on the value {}, "
"which is not an ray.ObjectID.".format(object_id))
unpin_object_data(object_id)
if ray.worker._mode() == ray.worker.LOCAL_MODE:
worker.local_mode_manager.free(object_ids)
+3 -3
View File
@@ -188,9 +188,9 @@ py_test(
)
py_test(
name = "test_garbage_collection",
size = "small",
srcs = ["test_garbage_collection.py"],
name = "test_reference_counting",
size = "medium",
srcs = ["test_reference_counting.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
+12 -14
View File
@@ -8,6 +8,7 @@ import logging
import os
import setproctitle
import shutil
import json
import sys
import socket
import subprocess
@@ -418,7 +419,12 @@ def test_initialized_local_mode(shutdown_only_with_initialization_check):
def test_wait_reconstruction(shutdown_only):
ray.init(num_cpus=1, object_store_memory=int(10**8))
ray.init(
num_cpus=1,
object_store_memory=int(10**8),
_internal_config=json.dumps({
"object_pinning_enabled": 0
}))
@ray.remote
def f():
@@ -577,21 +583,21 @@ def test_shutdown_disconnect_global_state():
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_put_pins_object(ray_start_object_store_memory):
x_id = ray.put("HI")
x_copy = ray.ObjectID(x_id.binary())
assert ray.get(x_copy) == "HI"
x_binary = x_id.binary()
assert ray.get(ray.ObjectID(x_binary)) == "HI"
# x cannot be evicted since x_id pins it
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
assert ray.get(x_id) == "HI"
assert ray.get(x_copy) == "HI"
assert ray.get(ray.ObjectID(x_binary)) == "HI"
# now it can be evicted since x_id pins it but x_copy does not
# now it can be evicted since x_id pins it but x_binary does not
del x_id
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(x_copy)
ray.get(ray.ObjectID(x_binary))
# weakref put
y_id = ray.put("HI", weakref=True)
@@ -600,14 +606,6 @@ def test_put_pins_object(ray_start_object_store_memory):
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(y_id)
@ray.remote
def check_no_buffer_ref(x):
assert x[0].get_buffer_ref() is None
z_id = ray.put("HI")
assert z_id.get_buffer_ref() is not None
ray.get(check_no_buffer_ref.remote([z_id]))
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
+5 -4
View File
@@ -936,11 +936,10 @@ def test_direct_call_serialized_id_eviction(ray_start_cluster):
@ray.remote
def get(obj_ids):
print("get", obj_ids)
obj_id = obj_ids[0]
assert (isinstance(ray.get(obj_id), np.ndarray))
# Evict the object.
ray.internal.free(obj_ids)
# Wait for the object to be evicted.
ray.internal.free(obj_id)
while ray.worker.global_worker.core_worker.object_exists(obj_id):
time.sleep(1)
with pytest.raises(ray.exceptions.UnreconstructableError):
@@ -948,7 +947,9 @@ def test_direct_call_serialized_id_eviction(ray_start_cluster):
print("get done", obj_ids)
obj = large_object.remote()
ray.get(get.remote([obj]))
result = get.remote([obj])
ray.internal.free(obj)
ray.get(result)
@pytest.mark.parametrize(
@@ -1,86 +0,0 @@
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import numpy as np
import time
import logging
import pytest
import ray
import ray.cluster_utils
import ray.test_utils
logger = logging.getLogger(__name__)
def test_basic_gc(shutdown_only):
ray.init(
object_store_memory=100 * 1024 * 1024,
use_pickle=True,
_internal_config=json.dumps({
"worker_heartbeat_timeout_milliseconds": 500,
"raylet_max_active_object_ids": 1000
}))
@ray.remote
def shuffle(input):
return np.random.shuffle(input)
@ray.remote
class Actor:
def __init__(self):
# Hold a long-lived reference to a ray.put object. This should not
# be garbage collected while the actor is alive.
self.large_object = ray.put(
np.zeros(25 * 1024 * 1024, dtype=np.uint8), weakref=True)
def get_large_object(self):
return ray.get(self.large_object)
actor = Actor.remote()
# Fill up the object store with short-lived objects. These should be
# evicted before the long-lived object whose reference is held by
# the actor.
for batch in range(10):
intermediate_result = shuffle.remote(
np.zeros(10 * 1024 * 1024, dtype=np.uint8))
ray.get(intermediate_result)
# The ray.get below would fail with only LRU eviction, as the object
# that was ray.put by the actor would have been evicted.
ray.get(actor.get_large_object.remote())
@pytest.mark.skip(reason="This test currently fails on Travis.")
def test_pending_task_dependency(shutdown_only):
ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True)
@ray.remote
def pending(input1, input2):
return
@ray.remote
def slow():
time.sleep(5)
# The object that is ray.put here will go out of scope immediately, so if
# pending task dependencies aren't considered, it will be evicted before
# the ray.get below due to the subsequent ray.puts that fill up the object
# store.
np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8)
oid = pending.remote(ray.put(np_array), slow.remote())
for _ in range(2):
ray.put(np_array)
ray.get(oid)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+2 -6
View File
@@ -15,7 +15,7 @@ class LightActor:
pass
def sample(self):
return np.zeros(1 * MB, dtype=np.uint8)
return np.zeros(5 * MB, dtype=np.uint8)
@ray.remote
@@ -29,9 +29,8 @@ class GreedyActor:
class TestMemoryLimits(unittest.TestCase):
def testWithoutQuota(self):
self._run(100 * MB, None, None)
self.assertRaises(OBJECT_EVICTED, lambda: self._run(None, None, None))
self.assertRaises(OBJECT_EVICTED,
lambda: self._run(100 * MB, None, None))
self.assertRaises(OBJECT_EVICTED,
lambda: self._run(None, 100 * MB, None))
@@ -66,14 +65,11 @@ class TestMemoryLimits(unittest.TestCase):
z = ray.put("hi", weakref=True)
a = LightActor._remote(object_store_memory=a_quota)
b = GreedyActor._remote(object_store_memory=b_quota)
oids = [z]
for _ in range(5):
r_a = a.sample.remote()
for _ in range(20):
new_oid = b.sample.remote()
oids.append(new_oid)
ray.get(new_oid)
oids.append(r_a)
ray.get(r_a)
ray.get(z)
except Exception as e:
+96 -2
View File
@@ -4,10 +4,12 @@ from __future__ import division
from __future__ import print_function
import os
import json
import copy
import tempfile
import numpy as np
import time
import pytest
import logging
import uuid
@@ -27,7 +29,7 @@ def _check_refcounts(expected):
assert submitted == actual[object_id]["submitted"]
def check_refcounts(expected, timeout=1):
def check_refcounts(expected, timeout=10):
start = time.time()
while True:
try:
@@ -156,7 +158,99 @@ def test_dependency_refcounts(ray_start_regular):
check_refcounts({})
def test_basic_pinning(shutdown_only):
ray.init(object_store_memory=100 * 1024 * 1024)
@ray.remote
def f(array):
return np.sum(array)
@ray.remote
class Actor(object):
def __init__(self):
# Hold a long-lived reference to a ray.put object's ID. The object
# should not be garbage collected while the actor is alive because
# the object is pinned by the raylet.
self.large_object = ray.put(
np.zeros(25 * 1024 * 1024, dtype=np.uint8))
def get_large_object(self):
return ray.get(self.large_object)
actor = Actor.remote()
# Fill up the object store with short-lived objects. These should be
# evicted before the long-lived object whose reference is held by
# the actor.
for batch in range(10):
intermediate_result = f.remote(
np.zeros(10 * 1024 * 1024, dtype=np.uint8))
ray.get(intermediate_result)
# The ray.get below would fail with only LRU eviction, as the object
# that was ray.put by the actor would have been evicted.
ray.get(actor.get_large_object.remote())
def test_pending_task_dependency_pinning(shutdown_only):
ray.init(object_store_memory=100 * 1024 * 1024, use_pickle=True)
@ray.remote
def pending(input1, input2):
return
@ray.remote
def slow(dep):
pass
# The object that is ray.put here will go out of scope immediately, so if
# pending task dependencies aren't considered, it will be evicted before
# the ray.get below due to the subsequent ray.puts that fill up the object
# store.
np_array = np.zeros(40 * 1024 * 1024, dtype=np.uint8)
random_id = ray.ObjectID.from_random()
oid = pending.remote(np_array, slow.remote(random_id))
for _ in range(2):
ray.put(np_array)
ray.worker.global_worker.put_object(None, object_id=random_id)
ray.get(oid)
def test_feature_flag(shutdown_only):
ray.init(
object_store_memory=100 * 1024 * 1024,
_internal_config=json.dumps({
"object_pinning_enabled": 0
}))
@ray.remote
def f(array):
return np.sum(array)
@ray.remote
class Actor(object):
def __init__(self):
self.large_object = ray.put(
np.zeros(25 * 1024 * 1024, dtype=np.uint8))
def get_large_object(self):
return ray.get(self.large_object)
actor = Actor.remote()
for batch in range(10):
intermediate_result = f.remote(
np.zeros(10 * 1024 * 1024, dtype=np.uint8))
ray.get(intermediate_result)
# The ray.get below fails with only LRU eviction, as the object
# that was ray.put by the actor should have been evicted.
with pytest.raises(ray.exceptions.RayTimeoutError):
ray.get(actor.get_large_object.remote(), timeout=1)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+4 -13
View File
@@ -242,7 +242,7 @@ class Worker:
"""
self.mode = mode
def put_object(self, value, object_id=None):
def put_object(self, value, object_id=None, pin_object=True):
"""Put value in the local object store with object id `objectid`.
This assumes that the value for `objectid` has not yet been placed in
@@ -256,6 +256,7 @@ class Worker:
value: The value to put in the object store.
object_id (object_id.ObjectID): The object ID of the value to be
put. If None, one will be generated.
pin_object: If set, the object will be pinned at the raylet.
Returns:
object_id.ObjectID: The object ID the object was put under.
@@ -276,7 +277,7 @@ class Worker:
serialized_value = self.get_serialization_context().serialize(value)
return self.core_worker.put_serialized_object(
serialized_value, object_id=object_id)
serialized_value, object_id=object_id, pin_object=pin_object)
def deserialize_objects(self,
data_metadata_pairs,
@@ -1519,7 +1520,7 @@ def put(value, weakref=False):
object_id = worker.local_mode_manager.put_object(value)
else:
try:
object_id = worker.put_object(value)
object_id = worker.put_object(value, pin_object=not weakref)
except ObjectStoreFullError:
logger.info(
"Put failed since the value was either too large or the "
@@ -1528,16 +1529,6 @@ def put(value, weakref=False):
"ray.put(value, weakref=True) to allow object data to "
"be evicted early.")
raise
# Pin the object buffer with the returned id. This avoids put returns
# from getting evicted out from under the id.
# TODO(edoakes): we should be able to avoid this extra IPC by holding
# a reference to the buffer created when putting the object, but the
# buffer returned by the plasma store create method doesn't prevent
# the object from being evicted.
if not weakref and not worker.mode == LOCAL_MODE:
object_id.set_buffer_ref(
worker.core_worker.get_objects([object_id],
worker.current_task_id))
return object_id