Distributed ref counting for serialized ObjectIDs (#6945)

* Skeleton plus a unit test for simple borrower case

* First unit test passes - forward an ID and task returns with 1 submitted task pending on the inner ID

* Invariant for contained_in

* Unit test passes for testing task return without creating a borrower

* Wrap ref count functionality in test case

* Fix bad delete

* Unit test and fix for borrowers creating more borrowers

* Unit test and fix for simple borrowing, but owner sends call after borrower's ref count goes to 0

* Refactor:
- keep a sentinel ref count for task argument IDs
- keep contained_in_borrowed in addition to contained_in_owned

* Unit test for nested IDs passes

* Refactor so that an object ID can only be contained in 1 borrowed ID at a time

* Add check

* Fix

* Unit test (passes) to test nesting object IDs but no borrowers created

* Unit test for nested objects from different owners passes, refactor to unset contained_in when popping refs

* Unit tests for borrowers receiving an ObjectID from multiple sources,
skip adding ownership info if we already have it to handle duplicate
refs

* Unit test for returning object ID passes

* More unit tests for returning object IDs pass

* Add serialized ID tests

* fix serialization issue

* remove swap

* It builds!

* debugging and some fixes:
- register handler for WaitForRefRemoved
- don't create a python reference for arg IDs
- pass in client factory into ReferenceCounter
- fix bad decrement in PopBorrowerRefs

* Fix accounting for serialized IDs:
- don't decrement for IDs on dependency resolution, wait until task finished
- add object IDs that were inlined when building the arguments to the task spec, pin these on the task executor until task finishes

* mu_ -> mutex_

* lint

* fix build

* clear outer_object_id

* add direct call type check

* Fix test for direct call IDs and return IDs for actor calls

* Fix CoreWorkerClient.Addr()

* Remove unneeded lock

* Remove unnecessary ObjectID refs

* Fix worker holding serialized refs test

* Fix hex IDs

* fix

* fix tests

* fix tests

* refactor and cleanups

* lint

* Put inlined Ids in task args and some cleanup

* Add back gc.collect() line for test case

* Refactor and fixes:
- store inlined IDs in RayObject
- allow storing objects with inlined IDs in memory store
- pin objects that were promoted to plasma

* oops

* make sure worker ID is set in address, pass in rpc::Address to CoreWorkerClient

* todos

* cleanups and test builds

* Fix tests

* Add feature flag

* cleanups

* address comments and some cleanups

* cleanup

* fix recursive test

* Comments for tests

* Turn off ref counting by default

* Skip tests

* Fix some bugs for test_array.py, java build

* Don't include nested objects in the ref count when the feature flag is off

* C++ feature flag does not work...

* Remove

* Turn on python tests and add a warning when plasma objects are evicted before being pinned

* Fix build and remove irrelevant test

* Fix for java

* Revert "Fix build and remove irrelevant test"

This reverts commit 056cca9b263ed05b0f9ab2250907338edcbca2d5.

* Fix ray.internal.free

* Fixes and skip some flaky tests

* fix java build

* fix windows build

* Add IDs contained in owned objects

* Update src/ray/protobuf/core_worker.proto

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Update src/ray/core_worker/reference_count.cc

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Update src/ray/protobuf/core_worker.proto

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Update src/ray/protobuf/core_worker.proto

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Update src/ray/core_worker/reference_count.h

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Update src/ray/core_worker/reference_count.h

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Update src/ray/core_worker/reference_count.cc

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* Apply suggestions from code review

Co-Authored-By: Edward Oakes <ed.nmi.oakes@gmail.com>

* update

* Try to fix ::test_direct_call_serialized_id_eviction

Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This commit is contained in:
Stephanie Wang
2020-02-18 18:21:34 -08:00
committed by GitHub
parent 4a12243336
commit f76ce836b2
43 changed files with 2639 additions and 305 deletions
+14 -6
View File
@@ -278,6 +278,8 @@ cdef void prepare_args(
size_t size
int64_t put_threshold
shared_ptr[CBuffer] arg_data
c_vector[CObjectID] inlined_ids
ObjectID obj_id
worker = ray.worker.global_worker
put_threshold = RayConfig.instance().max_direct_call_object_size()
@@ -294,14 +296,17 @@ cdef void prepare_args(
# plasma here. This is inefficient for small objects, but inlined
# arguments aren't associated ObjectIDs right now so this is a
# simple fix for reference counting purposes.
if (<int64_t>size <= put_threshold and
len(serialized_arg.contained_object_ids) == 0):
if <int64_t>size <= put_threshold:
arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer](
make_shared[LocalMemoryBuffer](size))
write_serialized_object(serialized_arg, arg_data)
for obj_id in serialized_arg.contained_object_ids:
inlined_ids.push_back(obj_id.native())
args_vector.push_back(
CTaskArg.PassByValue(make_shared[CRayObject](
arg_data, string_to_buffer(serialized_arg.metadata))))
arg_data, string_to_buffer(serialized_arg.metadata),
inlined_ids)))
inlined_ids.clear()
else:
args_vector.push_back(
CTaskArg.PassByReference((CObjectID.FromBinary(
@@ -664,7 +669,7 @@ cdef class CoreWorker:
c_object_id[0] = object_id.native()
with nogil:
check_status(self.core_worker.get().Create(
metadata, data_size, contained_ids,
metadata, data_size,
c_object_id[0], data))
break
except ObjectStoreFullError as e:
@@ -979,15 +984,18 @@ cdef class CoreWorker:
c_owner_address.SerializeAsString())
def deserialize_and_register_object_id(
self, const c_string &object_id_binary, const c_string
&owner_id_binary, const c_string &serialized_owner_address):
self, const c_string &object_id_binary, ObjectID outer_object_id,
const c_string &owner_id_binary,
const c_string &serialized_owner_address):
cdef:
CObjectID c_object_id = CObjectID.FromBinary(object_id_binary)
CObjectID c_outer_object_id = outer_object_id.native()
CTaskID c_owner_id = CTaskID.FromBinary(owner_id_binary)
CAddress c_owner_address = CAddress()
c_owner_address.ParseFromString(serialized_owner_address)
self.core_worker.get().RegisterOwnershipInfoAndResolveFuture(
c_object_id,
c_outer_object_id,
c_owner_id,
c_owner_address)
+4 -3
View File
@@ -142,8 +142,10 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CTaskID *owner_id,
CAddress *owner_address)
void RegisterOwnershipInfoAndResolveFuture(
const CObjectID &object_id, const CTaskID &owner_id, const
CAddress &owner_address)
const CObjectID &object_id,
const CObjectID &outer_object_id,
const CTaskID &owner_id,
const CAddress &owner_address)
void AddContainedObjectIDs(
const CObjectID &object_id,
const c_vector[CObjectID] &contained_object_ids)
@@ -161,7 +163,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CObjectID *object_id, shared_ptr[CBuffer] *data)
CRayStatus Create(const shared_ptr[CBuffer] &metadata,
const size_t data_size,
const c_vector[CObjectID] &contained_object_ids,
const CObjectID &object_id,
shared_ptr[CBuffer] *data)
CRayStatus Seal(const CObjectID &object_id, c_bool pin_object)
+17 -13
View File
@@ -156,6 +156,8 @@ class SerializationContext:
self.add_contained_object_id(obj)
owner_id = ""
owner_address = ""
# TODO(swang): Remove this check. Otherwise, we will not be able to
# handle serialized plasma IDs correctly.
if obj.is_direct_call_type():
worker = ray.worker.get_global_worker()
worker.check_connected()
@@ -176,14 +178,14 @@ class SerializationContext:
# to 'self' here instead, but this function is itself pickled
# somewhere, which causes an error.
context = ray.worker.global_worker.get_serialization_context()
context.add_contained_object_id(deserialized_object_id)
if owner_id:
worker = ray.worker.get_global_worker()
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
outer_id = context.get_outer_object_id()
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], owner_id[1][0], owner_address)
obj_id[1][0], outer_id, owner_id[1][0], owner_address)
return deserialized_object_id
for id_type in ray._raylet._ID_TYPES:
@@ -204,6 +206,12 @@ class SerializationContext:
# construct a reducer
pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer
def set_outer_object_id(self, outer_object_id):
self._thread_local.outer_object_id = outer_object_id
def get_outer_object_id(self):
return getattr(self._thread_local, "outer_object_id", None)
def get_and_clear_contained_object_ids(self):
if not hasattr(self._thread_local, "object_ids"):
self._thread_local.object_ids = set()
@@ -235,18 +243,8 @@ class SerializationContext:
# cloudpickle does not provide error types
except pickle.pickle.PicklingError:
raise DeserializationError()
# Check that there are no ObjectIDs serialized in arguments
# that are inlined.
if object_id.is_nil():
assert len(self.get_and_clear_contained_object_ids()) == 0
else:
worker = ray.worker.global_worker
worker.core_worker.add_contained_object_ids(
object_id,
self.get_and_clear_contained_object_ids(),
)
return obj
# Check if the object should be returned as raw bytes.
if metadata == ray_constants.RAW_BUFFER_METADATA:
if data is None:
@@ -287,6 +285,8 @@ class SerializationContext:
while i < len(object_ids):
object_id = object_ids[i]
data, metadata = data_metadata_pairs[i]
assert self.get_outer_object_id() is None
self.set_outer_object_id(object_id)
try:
results.append(
self._deserialize_object(data, metadata, object_id))
@@ -310,6 +310,9 @@ class SerializationContext:
warning_message,
job_id=self.worker.current_job_id)
warning_sent = True
finally:
# Must clear ObjectID to not hold a reference.
self.set_outer_object_id(None)
return results
@@ -328,6 +331,7 @@ class SerializationContext:
assert self.worker.use_pickle
assert ray.cloudpickle.FAST_CLOUDPICKLE_USED
writer = Pickle5Writer()
# TODO(swang): Check that contained_object_ids is empty.
inband = pickle.dumps(
value, protocol=5, buffer_callback=writer.buffer_callback)
return Pickle5SerializedObject(
+33 -14
View File
@@ -8,6 +8,7 @@ import time
import pytest
import logging
import uuid
import gc
import ray
import ray.cluster_utils
@@ -18,7 +19,13 @@ logger = logging.getLogger(__name__)
@pytest.fixture
def one_worker_100MiB(request):
yield ray.init(num_cpus=1, object_store_memory=100 * 1024 * 1024)
config = json.dumps({
"distributed_ref_counting_enabled": 1,
})
yield ray.init(
num_cpus=1,
object_store_memory=100 * 1024 * 1024,
_internal_config=config)
ray.shutdown()
@@ -266,7 +273,6 @@ def test_feature_flag(shutdown_only):
# Remote function takes serialized reference and doesn't hold onto it after
# finishing. Referenced object shouldn't be evicted while the task is pending
# and should be evicted after it returns.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
def test_basic_serialized_reference(one_worker_100MiB):
@ray.remote
def pending(ref, dep):
@@ -286,6 +292,9 @@ def test_basic_serialized_reference(one_worker_100MiB):
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Needed due to Python GC issue in cloudpickle.
# https://github.com/cloudpipe/cloudpickle/issues/343
gc.collect()
# Check that the remote reference pins the object.
_fill_object_store_and_get(array_oid_bytes)
@@ -301,7 +310,8 @@ def test_basic_serialized_reference(one_worker_100MiB):
# Call a recursive chain of tasks that pass a serialized reference to the end
# of the chain. The reference should still exist while the final task in the
# chain is running and should be removed once it finishes.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_recursive_serialized_reference(one_worker_100MiB):
@ray.remote
def recursive(ref, dep, max_depth, depth=0):
@@ -325,7 +335,7 @@ def test_recursive_serialized_reference(one_worker_100MiB):
del array_oid
tail_oid = head_oid
for _ in range(max_depth - 1):
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.
@@ -333,7 +343,7 @@ def test_recursive_serialized_reference(one_worker_100MiB):
# Fulfill the dependency, causing the tail task to finish.
ray.worker.global_worker.put_object(None, object_id=random_oid)
ray.get(tail_oid)
assert ray.get(tail_oid) is None
# Reference should be gone, check that array gets evicted.
_fill_object_store_and_get(array_oid_bytes, succeed=False)
@@ -342,7 +352,6 @@ def test_recursive_serialized_reference(one_worker_100MiB):
# Test that a passed reference held by an actor after the method finishes
# is kept until the reference is removed from the actor. Also tests giving
# the actor a duplicate reference to the same object ID.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
def test_actor_holding_serialized_reference(one_worker_100MiB):
@ray.remote
class GreedyActor(object):
@@ -376,6 +385,9 @@ def test_actor_holding_serialized_reference(one_worker_100MiB):
# Remove the local reference.
array_oid_bytes = array_oid.binary()
del array_oid
# Needed due to Python GC issue in cloudpickle.
# https://github.com/cloudpipe/cloudpickle/issues/343
gc.collect()
# Test that the remote references still pin the object.
_fill_object_store_and_get(array_oid_bytes)
@@ -392,7 +404,8 @@ def test_actor_holding_serialized_reference(one_worker_100MiB):
# Test that a passed reference held by an actor after a task finishes
# is kept until the reference is removed from the worker. Also tests giving
# the worker a duplicate reference to the same object ID.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_worker_holding_serialized_reference(one_worker_100MiB):
@ray.remote
def child(dep1, dep2):
@@ -428,7 +441,6 @@ def test_worker_holding_serialized_reference(one_worker_100MiB):
# Test that an object containing object IDs within it pins the inner IDs.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
def test_basic_nested_ids(one_worker_100MiB):
inner_oid = ray.put(np.zeros(40 * 1024 * 1024, dtype=np.uint8))
outer_oid = ray.put([inner_oid])
@@ -436,6 +448,9 @@ def test_basic_nested_ids(one_worker_100MiB):
# Remove the local reference to the inner object.
inner_oid_bytes = inner_oid.binary()
del inner_oid
# Needed due to Python GC issue in cloudpickle.
# https://github.com/cloudpipe/cloudpickle/issues/343
gc.collect()
# Check that the outer reference pins the inner object.
_fill_object_store_and_get(inner_oid_bytes)
@@ -447,7 +462,8 @@ def test_basic_nested_ids(one_worker_100MiB):
# Test that an object containing object IDs within it pins the inner IDs
# recursively and for submitted tasks.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_recursively_nest_ids(one_worker_100MiB):
@ray.remote
def recursive(ref, dep, max_depth, depth=0):
@@ -474,7 +490,7 @@ def test_recursively_nest_ids(one_worker_100MiB):
del array_oid, nested_oid
tail_oid = head_oid
for _ in range(max_depth - 1):
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.
@@ -490,7 +506,8 @@ def test_recursively_nest_ids(one_worker_100MiB):
# Test that serialized objectIDs returned from remote tasks are pinned until
# they go out of scope on the caller side.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_return_object_id(one_worker_100MiB):
@ray.remote
def put():
@@ -519,7 +536,8 @@ def test_return_object_id(one_worker_100MiB):
# Test that serialized objectIDs returned from remote tasks are pinned if
# passed into another remote task by the caller.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_pass_returned_object_id(one_worker_100MiB):
@ray.remote
def put():
@@ -555,7 +573,8 @@ def test_pass_returned_object_id(one_worker_100MiB):
# returned by another task to the end of the chain. The reference should still
# exist while the final task in the chain is running and should be removed once
# it finishes.
@pytest.mark.skip("Serialized ObjectID reference counting not implemented.")
@pytest.mark.skip("Memory not freed due to Python GC issue in cloudpickle "
"(https://github.com/cloudpipe/cloudpickle/issues/343).")
def test_recursively_pass_returned_object_id(one_worker_100MiB):
@ray.remote
def put():
@@ -583,7 +602,7 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB):
del outer_oid
tail_oid = head_oid
for _ in range(max_depth - 1):
for _ in range(max_depth):
tail_oid = ray.get(tail_oid)
# Check that the remote reference pins the object.