diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 33e6c61e6..bb76a8018 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -177,10 +177,9 @@ cdef c_vector[CObjectID] ObjectIDsToVector(object_ids): The output vector. """ cdef: - ObjectID object_id c_vector[CObjectID] result for object_id in object_ids: - result.push_back(object_id.native()) + result.push_back((object_id).native()) return result @@ -267,7 +266,6 @@ cdef void prepare_args( int64_t put_threshold shared_ptr[CBuffer] arg_data c_vector[CObjectID] inlined_ids - ObjectID obj_id worker = ray.worker.global_worker put_threshold = RayConfig.instance().max_direct_call_object_size() @@ -288,8 +286,8 @@ cdef void prepare_args( arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer]( make_shared[LocalMemoryBuffer](size)) write_serialized_object(serialized_arg, arg_data) - for obj_id in serialized_arg.contained_object_ids: - inlined_ids.push_back(obj_id.native()) + for object_id in serialized_arg.contained_object_ids: + inlined_ids.push_back((object_id).native()) args_vector.push_back( CTaskArg.PassByValue(make_shared[CRayObject]( arg_data, string_to_buffer(serialized_arg.metadata), @@ -298,7 +296,7 @@ cdef void prepare_args( else: args_vector.push_back( CTaskArg.PassByReference((CObjectID.FromBinary( - core_worker.put_serialized_cobject(serialized_arg))))) + core_worker.put_serialized_object(serialized_arg))))) cdef deserialize_args( const c_vector[shared_ptr[CRayObject]] &c_args, @@ -694,12 +692,6 @@ cdef class CoreWorker: def put_serialized_object(self, serialized_object, ObjectID object_id=None, c_bool pin_object=True): - return ObjectID(self.put_serialized_cobject( - serialized_object, object_id, pin_object)) - - def put_serialized_cobject(self, serialized_object, - ObjectID object_id=None, - c_bool pin_object=True): cdef: CObjectID c_object_id shared_ptr[CBuffer] data @@ -916,7 +908,9 @@ cdef class CoreWorker: extra_data) def deserialize_and_register_actor_handle(self, const c_string &bytes): - cdef CActorHandle* c_actor_handle + cdef: + CActorHandle* c_actor_handle + worker = ray.worker.get_global_worker() worker.check_connected() manager = worker.function_actor_manager @@ -996,6 +990,7 @@ cdef class CoreWorker: CObjectID c_outer_object_id = outer_object_id.native() CTaskID c_owner_id = CTaskID.FromBinary(owner_id_binary) CAddress c_owner_address = CAddress() + c_owner_address.ParseFromString(serialized_owner_address) self.core_worker.get().RegisterOwnershipInfoAndResolveFuture( c_object_id, diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index f99be4522..5e127dc1b 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -40,8 +40,9 @@ class PlasmaEventHandler: """Process notifications.""" for object_id, object_size, metadata_size in messages: if object_size > 0 and object_id in self._waiting_dict: - # This must be asynchronous to allow objects to be locally - # received + # This must be asynchronous because it runs on the main IO + # thread in the worker. If this is blocked, other messages + # won't be received. self._loop.call_soon_threadsafe(_complete_future, self, object_id) diff --git a/python/ray/includes/unique_ids.pxd b/python/ray/includes/unique_ids.pxd index 012222a2a..81bb0a806 100644 --- a/python/ray/includes/unique_ids.pxd +++ b/python/ray/includes/unique_ids.pxd @@ -152,6 +152,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil: CObjectID WithPlasmaTransportType() + CObjectID WithDirectTransportType() + int64_t ObjectIndex() const CTaskID TaskId() const diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 6a54b7846..ea9f298d6 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -185,7 +185,7 @@ cdef class ObjectID(BaseID): @classmethod def from_random(cls): - return cls(CObjectID.FromRandom().Binary()) + return cls(CObjectID.FromRandom().WithDirectTransportType().Binary()) def __await__(self): # Delayed import because this can only be imported in py3. diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 79cf548f6..2d5cd561d 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -192,6 +192,10 @@ class SerializationContext: # UniqueIDs are serialized as # (class name, (unique bytes,)). outer_id = context.get_outer_object_id() + # outer_id is None in the case that this ObjectID was closed + # over in a function or pickled directly using pickle.dumps(). + if outer_id is None: + outer_id = ray.ObjectID.nil() worker.core_worker.deserialize_and_register_object_id( obj_id[1][0], outer_id, owner_id[1][0], owner_address) return deserialized_object_id diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index c5a6b66ca..65be5fd98 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -235,3 +235,15 @@ class RemoteSignal: def wait(self): ray.get(self.signal_actor.wait.remote()) + + +@ray.remote +def _put(obj): + return obj + + +def put_object(obj, use_ray_put): + if use_ray_put: + return ray.put(obj) + else: + return _put.remote(obj) diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 1956f7d20..6b693f002 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -527,8 +527,8 @@ def test_put_pins_object(ray_start_object_store_memory): del x_id for _ in range(10): ray.put(np.zeros(10 * 1024 * 1024)) - with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(ray.ObjectID(x_binary)) + assert not ray.worker.global_worker.core_worker.object_exists( + ray.ObjectID(x_binary)) # weakref put y_id = ray.put("HI", weakref=True) diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index 3872cf193..eb6308df5 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -163,19 +163,18 @@ def test_asyncio_actor_async_get(ray_start_regular_shared): def remote_task(): return 1 - plasma_object = ray.put(2) - @ray.remote class AsyncGetter: async def get(self): return await remote_task.remote() - async def plasma_get(self): - return await plasma_object + async def plasma_get(self, plasma_object): + return await plasma_object[0] - getter = AsyncGetter.options().remote() + plasma_object = ray.put(2) + getter = AsyncGetter.remote() assert ray.get(getter.get.remote()) == 1 - assert ray.get(getter.plasma_get.remote()) == 2 + assert ray.get(getter.plasma_get.remote([plasma_object])) == 2 if __name__ == "__main__": diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index fd521c79f..ba266b60b 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -841,10 +841,15 @@ def test_raylet_crash_when_get(ray_start_regular): time.sleep(2) ray.worker._global_node.kill_raylet() + object_id = ray.put(None) + ray.internal.free(object_id) + while ray.worker.global_worker.core_worker.object_exists(object_id): + time.sleep(1) + thread = threading.Thread(target=sleep_to_kill_raylet) thread.start() with pytest.raises(ray.exceptions.UnreconstructableError): - ray.get(ray.ObjectID.from_random()) + ray.get(object_id) thread.join() diff --git a/python/ray/tests/test_reference_counting.py b/python/ray/tests/test_reference_counting.py index 64d7c3dec..2f317b850 100644 --- a/python/ray/tests/test_reference_counting.py +++ b/python/ray/tests/test_reference_counting.py @@ -1,4 +1,5 @@ # coding: utf-8 +import asyncio import copy import json import logging @@ -12,7 +13,7 @@ import pytest import ray import ray.cluster_utils -from ray.test_utils import SignalActor, wait_for_condition +from ray.test_utils import SignalActor, put_object, wait_for_condition from ray.internal.internal_api import global_gc logger = logging.getLogger(__name__) @@ -372,19 +373,14 @@ def test_feature_flag(shutdown_only): # Remote function takes serialized reference and doesn't hold onto it after # finishing. Referenced object shouldn't be evicted while the task is pending # and should be evicted after it returns. -def test_basic_serialized_reference(one_worker_100MiB): +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_basic_serialized_reference(one_worker_100MiB, use_ray_put): @ray.remote def pending(ref, dep): ray.get(ref[0]) - # TODO(edoakes): currently these tests don't work with ray.put() so we need - # to return from a task like this instead. Once that is fixed, should have - # tests run with both codepaths. - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - - array_oid = put.remote() + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) signal = SignalActor.remote() oid = pending.remote([array_oid], signal.wait.remote()) @@ -406,7 +402,19 @@ def test_basic_serialized_reference(one_worker_100MiB): # Call a recursive chain of tasks that pass a serialized reference to the end # of the chain. The reference should still exist while the final task in the # chain is running and should be removed once it finishes. -def test_recursive_serialized_reference(one_worker_100MiB): +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_recursive_serialized_reference(one_worker_100MiB, use_ray_put): + @ray.remote(num_cpus=0) + class Signal: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self): + self.ready_event.set() + + async def wait(self): + await self.ready_event.wait() + @ray.remote def recursive(ref, signal, max_depth, depth=0): ray.get(ref[0]) @@ -415,14 +423,11 @@ def test_recursive_serialized_reference(one_worker_100MiB): else: return recursive.remote(ref, signal, max_depth, depth + 1) - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - signal = SignalActor.remote() max_depth = 5 - array_oid = put.remote() + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) head_oid = recursive.remote([array_oid], signal, max_depth) # Remove the local reference. @@ -447,7 +452,8 @@ def test_recursive_serialized_reference(one_worker_100MiB): # Test that a passed reference held by an actor after the method finishes # is kept until the reference is removed from the actor. Also tests giving # the actor a duplicate reference to the same object ID. -def test_actor_holding_serialized_reference(one_worker_100MiB): +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_actor_holding_serialized_reference(one_worker_100MiB, use_ray_put): @ray.remote class GreedyActor(object): def __init__(self): @@ -465,12 +471,9 @@ def test_actor_holding_serialized_reference(one_worker_100MiB): def delete_ref2(self): self.ref2 = None - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - # Test that the reference held by the actor isn't evicted. - array_oid = put.remote() + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) actor = GreedyActor.remote() actor.set_ref1.remote([array_oid]) @@ -496,7 +499,19 @@ def test_actor_holding_serialized_reference(one_worker_100MiB): # Test that a passed reference held by an actor after a task finishes # is kept until the reference is removed from the worker. Also tests giving # the worker a duplicate reference to the same object ID. -def test_worker_holding_serialized_reference(one_worker_100MiB): +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_worker_holding_serialized_reference(one_worker_100MiB, use_ray_put): + @ray.remote(num_cpus=0) + class Signal: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self): + self.ready_event.set() + + async def wait(self): + await self.ready_event.wait() + @ray.remote def child(dep1, dep2): return @@ -505,14 +520,11 @@ def test_worker_holding_serialized_reference(one_worker_100MiB): def launch_pending_task(ref, signal): return child.remote(ref[0], signal.wait.remote()) - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - signal = SignalActor.remote() # Test that the reference held by the actor isn't evicted. - array_oid = put.remote() + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) child_return_id = ray.get(launch_pending_task.remote([array_oid], signal)) # Remove the local reference. @@ -548,7 +560,19 @@ def test_basic_nested_ids(one_worker_100MiB): # Test that an object containing object IDs within it pins the inner IDs # recursively and for submitted tasks. -def test_recursively_nest_ids(one_worker_100MiB): +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_recursively_nest_ids(one_worker_100MiB, use_ray_put): + @ray.remote(num_cpus=0) + class Signal: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self): + self.ready_event.set() + + async def wait(self): + await self.ready_event.wait() + @ray.remote def recursive(ref, signal, max_depth, depth=0): unwrapped = ray.get(ref[0]) @@ -557,14 +581,11 @@ def test_recursively_nest_ids(one_worker_100MiB): else: return recursive.remote(unwrapped, signal, max_depth, depth + 1) - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - signal = SignalActor.remote() max_depth = 5 - array_oid = put.remote() + array_oid = put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) nested_oid = array_oid for _ in range(max_depth): nested_oid = ray.put([nested_oid]) @@ -591,14 +612,14 @@ def test_recursively_nest_ids(one_worker_100MiB): # Test that serialized objectIDs returned from remote tasks are pinned until # they go out of scope on the caller side. -def test_return_object_id(one_worker_100MiB): - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_return_object_id(one_worker_100MiB, use_ray_put): @ray.remote def return_an_id(): - return [put.remote()] + return [ + put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + ] outer_oid = return_an_id.remote() inner_oid_binary = ray.get(outer_oid)[0].binary() @@ -619,14 +640,29 @@ def test_return_object_id(one_worker_100MiB): # Test that serialized objectIDs returned from remote tasks are pinned if # passed into another remote task by the caller. -def test_pass_returned_object_id(one_worker_100MiB): +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_pass_returned_object_id(one_worker_100MiB, use_ray_put): + @ray.remote(num_cpus=0) + class Signal: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self): + self.ready_event.set() + + async def wait(self): + await self.ready_event.wait() + @ray.remote def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) + return @ray.remote def return_an_id(): - return [put.remote()] + return [ + put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + ] @ray.remote def pending(ref, signal): @@ -654,14 +690,25 @@ def test_pass_returned_object_id(one_worker_100MiB): # returned by another task to the end of the chain. The reference should still # exist while the final task in the chain is running and should be removed once # it finishes. -def test_recursively_pass_returned_object_id(one_worker_100MiB): - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_recursively_pass_returned_object_id(one_worker_100MiB, use_ray_put): + @ray.remote(num_cpus=0) + class Signal: + def __init__(self): + self.ready_event = asyncio.Event() + + def send(self): + self.ready_event.set() + + async def wait(self): + await self.ready_event.wait() @ray.remote def return_an_id(): - return [put.remote()] + return [ + put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) + ] @ray.remote def recursive(ref, signal, max_depth, depth=0): @@ -700,15 +747,13 @@ def test_recursively_pass_returned_object_id(one_worker_100MiB): # returns the same ObjectID by calling ray.get() on its submitted task and # returning the result. The reference should still exist while the driver has a # reference to the final task's ObjectID. -def test_recursively_return_borrowed_object_id(one_worker_100MiB): - @ray.remote - def put(): - return np.zeros(40 * 1024 * 1024, dtype=np.uint8) - +@pytest.mark.parametrize("use_ray_put", [False, True]) +def test_recursively_return_borrowed_object_id(one_worker_100MiB, use_ray_put): @ray.remote def recursive(num_tasks_left): if num_tasks_left == 0: - return put.remote() + return put_object( + np.zeros(40 * 1024 * 1024, dtype=np.uint8), use_ray_put) final_id = ray.get(recursive.remote(num_tasks_left - 1)) ray.get(final_id) diff --git a/python/ray/worker.py b/python/ray/worker.py index 8218a40e6..568d4a967 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -270,8 +270,15 @@ class Worker: "call 'put' on it (or return it).") serialized_value = self.get_serialization_context().serialize(value) - return self.core_worker.put_serialized_object( - serialized_value, object_id=object_id, pin_object=pin_object) + # This *must* be the first place that we construct this python + # ObjectID because an entry with 0 local references is created when + # the object is Put() in the core worker, expecting that this python + # reference will be created. If another reference is created and + # removed before this one, it will corrupt the state in the + # reference counter. + return ray.ObjectID( + self.core_worker.put_serialized_object( + serialized_value, object_id=object_id, pin_object=pin_object)) def deserialize_objects(self, data_metadata_pairs, object_ids): context = self.get_serialization_context() diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2aebc9c9d..5ac754592 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -390,7 +390,7 @@ Status CoreWorker::Put(const RayObject &object, ObjectID *object_id) { *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex(), - static_cast(TaskTransportType::RAYLET)); + static_cast(TaskTransportType::DIRECT)); reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), rpc_address_); return Put(object, contained_object_ids, *object_id, /*pin_object=*/true); @@ -419,7 +419,7 @@ Status CoreWorker::Put(const RayObject &object, RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } } - return Status::OK(); + return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); } Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, @@ -427,7 +427,7 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t ObjectID *object_id, std::shared_ptr *data) { *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex(), - static_cast(TaskTransportType::RAYLET)); + static_cast(TaskTransportType::DIRECT)); RAY_RETURN_NOT_OK( plasma_store_provider_->Create(metadata, data_size, *object_id, data)); // Only add the object to the reference counter if it didn't already exist. @@ -462,7 +462,7 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, } else { RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); } - return Status::OK(); + return memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id); } Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_ms, @@ -538,13 +538,9 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { bool found = false; - if (object_id.IsDirectCallType()) { - bool in_plasma = false; - found = memory_store_->Contains(object_id, &in_plasma); - if (in_plasma) { - RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found)); - } - } else { + bool in_plasma = false; + found = memory_store_->Contains(object_id, &in_plasma); + if (in_plasma) { RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found)); } *has_object = found; @@ -667,14 +663,9 @@ Status CoreWorker::Delete(const std::vector &object_ids, bool local_on // We only delete from plasma, which avoids hangs (issue #7105). In-memory // objects are always handled by ref counting only. - absl::flat_hash_set plasma_object_ids; - for (const auto &obj_id : object_ids) { - plasma_object_ids.insert(obj_id); - } - RAY_RETURN_NOT_OK(plasma_store_provider_->Delete(plasma_object_ids, local_only, - delete_creating_tasks)); - - return Status::OK(); + absl::flat_hash_set plasma_object_ids(object_ids.begin(), object_ids.end()); + return plasma_store_provider_->Delete(plasma_object_ids, local_only, + delete_creating_tasks); } void CoreWorker::TriggerGlobalGC() { @@ -1407,7 +1398,10 @@ void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &re rpc::PlasmaObjectReadyReply *reply, rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(plasma_done_callback_ != nullptr) << "Plasma done callback not defined."; - // This callback must be asynchronous to allow plasma to receive objects + // This callback needs to be asynchronous because it runs on the io_service_, so no + // RPCs can be processed while it's running. This can easily lead to deadlock (for + // example if the callback calls ray.get() on an object that is dependent on an RPC + // to be ready). plasma_done_callback_(ObjectID::FromBinary(request.object_id()), request.data_size(), request.metadata_size()); send_reply_callback(Status::OK(), nullptr, nullptr); diff --git a/src/ray/core_worker/fiber.h b/src/ray/core_worker/fiber.h index b8b6acdd8..5af1e3361 100644 --- a/src/ray/core_worker/fiber.h +++ b/src/ray/core_worker/fiber.h @@ -129,4 +129,4 @@ class FiberState { } // namespace ray -#endif // RAY_CORE_WORKER_FIBER_H \ No newline at end of file +#endif // RAY_CORE_WORKER_FIBER_H diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index ead233a5f..9391f5eb6 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -158,11 +158,13 @@ std::shared_ptr CoreWorkerMemoryStore::GetOrPromoteToPlasma( } Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &object_id) { - RAY_CHECK(object_id.IsDirectCallType()); std::vector)>> async_callbacks; auto object_entry = std::make_shared(object.GetData(), object.GetMetadata(), object.GetNestedIds(), true); + // TODO(edoakes): we should instead return a flag to the caller to put the object in + // plasma. + bool should_put_in_plasma = false; { absl::MutexLock lock(&mu_); @@ -181,11 +183,9 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec auto promoted_it = promoted_to_plasma_.find(object_id); if (promoted_it != promoted_to_plasma_.end()) { RAY_CHECK(store_in_plasma_ != nullptr); - if (!object.IsInPlasmaError()) { - // Only need to promote to plasma if it wasn't already put into plasma - // by the task that created the object. - store_in_plasma_(object, object_id); - } + // Only need to promote to plasma if it wasn't already put into plasma + // by the task that created the object. + should_put_in_plasma = !object.IsInPlasmaError(); promoted_to_plasma_.erase(promoted_it); } @@ -212,6 +212,13 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec } } + // Must be called without holding the lock because store_in_plasma_ goes + // through the regular CoreWorker::Put() codepath, which calls into the + // in-memory store (would cause deadlock). + if (should_put_in_plasma) { + store_in_plasma_(object, object_id); + } + // It's important for performance to run the callbacks outside the lock. for (const auto &cb : async_callbacks) { cb(object_entry); @@ -250,7 +257,6 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, if (iter != objects_.end()) { (*results)[i] = iter->second; if (remove_after_get) { - RAY_LOG(ERROR) << "REMOVE_AFTER_GET"; // Note that we cannot remove the object_id from `objects_` now, // because `object_ids` might have duplicate ids. ids_to_remove.insert(object_id); @@ -437,7 +443,6 @@ bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id, bool *in_plasma) if (it != objects_.end()) { if (it->second->IsInPlasmaError()) { *in_plasma = true; - return false; } return true; } diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 38ea97c71..4db25d835 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -111,7 +111,7 @@ class CoreWorkerMemoryStore { /// /// \param[in] object_id The object to check. /// \param[out] in_plasma Set to true if the object was spilled to plasma. - /// If this is set to true, Contains() will return false. + /// Will only be true if the store contains the object. /// \return Whether the store has the object. bool Contains(const ObjectID &object_id, bool *in_plasma); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 11e196584..e5540b824 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -951,76 +951,6 @@ TEST_F(SingleNodeTest, TestObjectInterface) { ASSERT_TRUE(!results[1]); } -TEST_F(TwoNodeTest, TestObjectInterfaceCrossNodes) { - CoreWorker worker1(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], - raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", - node_manager_port, nullptr); - - CoreWorker worker2(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[1], - raylet_socket_names_[1], NextJobId(), gcs_options_, "", "127.0.0.1", - node_manager_port, nullptr); - - uint8_t array1[] = {1, 2, 3, 4, 5, 6, 7, 8}; - uint8_t array2[] = {10, 11, 12, 13, 14, 15}; - - std::vector> buffers; - buffers.emplace_back(std::make_shared(array1, sizeof(array1))); - buffers.emplace_back(std::make_shared(array2, sizeof(array2))); - - std::vector ids(buffers.size()); - for (size_t i = 0; i < ids.size(); i++) { - RAY_CHECK_OK(worker1.Put(RayObject(buffers[i], nullptr, std::vector()), {}, - &ids[i])); - } - - // Test Get() from remote node. - std::vector> results; - RAY_CHECK_OK(worker2.Get(ids, -1, &results)); - - ASSERT_EQ(results.size(), 2); - for (size_t i = 0; i < ids.size(); i++) { - ASSERT_EQ(results[i]->GetData()->Size(), buffers[i]->Size()); - ASSERT_EQ(*(results[i]->GetData()), *buffers[i]); - } - - // Test Wait() from remote node. - ObjectID non_existent_id = ObjectID::FromRandom(); - std::vector all_ids(ids); - all_ids.push_back(non_existent_id); - - std::vector wait_results; - RAY_CHECK_OK(worker2.Wait(all_ids, 2, -1, &wait_results)); - ASSERT_EQ(wait_results.size(), 3); - ASSERT_EQ(wait_results, std::vector({true, true, false})); - - RAY_CHECK_OK(worker2.Wait(all_ids, 3, 100, &wait_results)); - ASSERT_EQ(wait_results.size(), 3); - ASSERT_EQ(wait_results, std::vector({true, true, false})); - - // Test Delete() from all machines. - // clear the reference held by PlasmaBuffer. - results.clear(); - RAY_CHECK_OK(worker2.Delete(ids, false, false)); - - // Note that Delete() calls RayletClient::FreeObjects and would not - // wait for objects being deleted, so wait a while for plasma store - // to process the command. - usleep(1000 * 1000); - // Verify objects are deleted from both machines. - ASSERT_TRUE(worker2.Get(ids, 0, &results).IsTimedOut()); - ASSERT_EQ(results.size(), 2); - ASSERT_TRUE(!results[0]); - ASSERT_TRUE(!results[1]); - - // TODO(edoakes): this currently fails because the object is pinned on the - // creating node. Should be fixed or removed once we decide the semantics - // for Delete() with pinning. - // ASSERT_TRUE(worker1.Get(ids, 0, &results).IsTimedOut()); - // ASSERT_EQ(results.size(), 2); - // ASSERT_TRUE(!results[0]); - // ASSERT_TRUE(!results[1]); -} - TEST_F(SingleNodeTest, TestNormalTaskLocal) { std::unordered_map resources; TestNormalTask(resources); diff --git a/src/ray/core_worker/test/scheduling_queue_test.cc b/src/ray/core_worker/test/scheduling_queue_test.cc index cbc608289..bdb20715e 100644 --- a/src/ray/core_worker/test/scheduling_queue_test.cc +++ b/src/ray/core_worker/test/scheduling_queue_test.cc @@ -24,7 +24,7 @@ TEST(SchedulingQueueTest, TestInOrder) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context, 0); + SchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -45,7 +45,7 @@ TEST(SchedulingQueueTest, TestWaitForObjects) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context, 0); + SchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -71,7 +71,7 @@ TEST(SchedulingQueueTest, TestWaitForObjectsNotSubjectToSeqTimeout) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context, 0); + SchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -89,7 +89,7 @@ TEST(SchedulingQueueTest, TestOutOfOrder) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context, 0); + SchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -107,7 +107,7 @@ TEST(SchedulingQueueTest, TestSeqWaitTimeout) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context, 0); + SchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; @@ -130,7 +130,7 @@ TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) { boost::asio::io_service io_service; MockWaiter waiter; WorkerContext context(WorkerType::WORKER, JobID::Nil()); - SchedulingQueue queue(io_service, waiter, context, 0); + SchedulingQueue queue(io_service, waiter, context); int n_ok = 0; int n_rej = 0; auto fn_ok = [&n_ok]() { n_ok++; }; diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 317213bdd..89e840d46 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -183,7 +183,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use"; const TaskSpecification task_spec(request.task_spec()); - RAY_LOG(DEBUG) << "Received task " << task_spec.DebugString(); if (task_spec.IsActorTask() && !worker_context_.CurrentTaskIsDirectCall()) { send_reply_callback(Status::Invalid("This actor doesn't accept direct calls."), nullptr, nullptr); diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index f6c835a43..d37cde352 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -178,14 +178,6 @@ raylet::RayletClient::RayletClient( } Status raylet::RayletClient::SubmitTask(const TaskSpecification &task_spec) { - for (size_t i = 0; i < task_spec.NumArgs(); i++) { - if (task_spec.ArgByRef(i)) { - for (size_t j = 0; j < task_spec.ArgIdCount(i); j++) { - RAY_CHECK(!task_spec.ArgId(i, j).IsDirectCallType()) - << "Passing direct call objects to non-direct tasks is not allowed."; - } - } - } flatbuffers::FlatBufferBuilder fbb; auto message = protocol::CreateSubmitTaskRequest(fbb, fbb.CreateString(task_spec.Serialize()));