diff --git a/python/ray/async_compat.py b/python/ray/async_compat.py index 2b839c77b..143eb1c88 100644 --- a/python/ray/async_compat.py +++ b/python/ray/async_compat.py @@ -92,5 +92,7 @@ def get_async(object_id): inner_future.add_done_callback(done_callback) # A hack to keep reference to inner_future so it doesn't get GC. user_future.inner_future = inner_future + # A hack to keep a reference to the object ID for ref counting. + user_future.object_id = object_id return user_future diff --git a/python/ray/includes/unique_ids.pxi b/python/ray/includes/unique_ids.pxi index 5aaf65d6d..5b596bf44 100644 --- a/python/ray/includes/unique_ids.pxi +++ b/python/ray/includes/unique_ids.pxi @@ -170,9 +170,6 @@ cdef class ObjectID(BaseID): def is_direct_call_type(self): return self.data.IsDirectCallType() - def with_plasma_transport_type(self): - return ObjectID(self.data.WithPlasmaTransportType().Binary()) - def is_nil(self): return self.data.IsNil() diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index 685109836..b4512cb74 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -54,7 +54,7 @@ def test_dying_worker_get(ray_start_2_cpus): assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. - ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type()) + ray.worker.global_worker.put_object(1, x_id) time.sleep(0.1) # Make sure that nothing has died. @@ -97,7 +97,7 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}"))) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. - ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type()) + ray.worker.global_worker.put_object(1, x_id) time.sleep(0.1) # Make sure that nothing has died. @@ -137,7 +137,7 @@ def test_dying_worker_wait(ray_start_2_cpus): time.sleep(0.1) # Create the object. - ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type()) + ray.worker.global_worker.put_object(1, x_id) time.sleep(0.1) # Make sure that nothing has died. @@ -180,7 +180,7 @@ ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))]) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # wait can return. - ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type()) + ray.worker.global_worker.put_object(1, x_id) time.sleep(0.1) # Make sure that nothing has died. diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 8ac249fa6..4123f865d 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -1,7 +1,7 @@ -#include - #include "ray/core_worker/actor_handle.h" +#include + namespace { ray::rpc::ActorHandle CreateInnerActorHandle( diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index dd5a9b6d7..d6665440d 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -1,4 +1,5 @@ #include "ray/core_worker/actor_manager.h" + #include "ray/gcs/redis_actor_info_accessor.h" namespace ray { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ed2abadd7..f48b2f6af 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -329,9 +329,8 @@ void CoreWorker::PromoteToPlasmaAndGetOwnershipInfo(const ObjectID &object_id, rpc::Address *owner_address) { RAY_CHECK(object_id.IsDirectCallType()); auto value = memory_store_->GetOrPromoteToPlasma(object_id); - if (value != nullptr) { - RAY_CHECK_OK( - plasma_store_provider_->Put(*value, object_id.WithPlasmaTransportType())); + if (value) { + RAY_CHECK_OK(plasma_store_provider_->Put(*value, object_id)); } auto has_owner = reference_counter_->GetOwner(object_id, owner_id, owner_address); @@ -405,51 +404,31 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m bool got_exception = false; absl::flat_hash_map> result_map; auto start_time = current_time_ms(); - RAY_RETURN_NOT_OK(plasma_store_provider_->Get( - plasma_object_ids, timeout_ms, worker_context_, &result_map, &got_exception)); - if (!got_exception) { - int64_t local_timeout_ms = timeout_ms; - if (timeout_ms >= 0) { - local_timeout_ms = std::max(static_cast(0), - timeout_ms - (current_time_ms() - start_time)); - } - RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, local_timeout_ms, - worker_context_, &result_map, &got_exception)); + if (!memory_object_ids.empty()) { + RAY_RETURN_NOT_OK(memory_store_->Get(memory_object_ids, timeout_ms, worker_context_, + &result_map, &got_exception)); } if (!got_exception) { // If any of the objects have been promoted to plasma, then we retry their // gets at the provider plasma. Once we get the objects from plasma, we flip // the transport type again and return them for the original direct call ids. - absl::flat_hash_set promoted_plasma_ids; for (const auto &pair : result_map) { if (pair.second->IsInPlasmaError()) { - RAY_LOG(DEBUG) << pair.first << " in plasma, doing fetch-and-get"; - promoted_plasma_ids.insert( - pair.first.WithTransportType(TaskTransportType::RAYLET)); + RAY_LOG(INFO) << pair.first << " in plasma, doing fetch-and-get"; + plasma_object_ids.insert(pair.first); } } - if (!promoted_plasma_ids.empty()) { - int64_t local_timeout_ms = timeout_ms; - if (timeout_ms >= 0) { - local_timeout_ms = std::max(static_cast(0), - timeout_ms - (current_time_ms() - start_time)); - } - RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms; - RAY_RETURN_NOT_OK(plasma_store_provider_->Get(promoted_plasma_ids, local_timeout_ms, - worker_context_, &result_map, - &got_exception)); - for (const auto &id : promoted_plasma_ids) { - auto it = result_map.find(id); - if (it == result_map.end()) { - result_map.erase(id.WithTransportType(TaskTransportType::DIRECT)); - } else { - result_map[id.WithTransportType(TaskTransportType::DIRECT)] = it->second; - } - result_map.erase(id); - } + int64_t local_timeout_ms = timeout_ms; + if (timeout_ms >= 0) { + local_timeout_ms = std::max(static_cast(0), + timeout_ms - (current_time_ms() - start_time)); } + RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms; + RAY_RETURN_NOT_OK(plasma_store_provider_->Get(plasma_object_ids, local_timeout_ms, + worker_context_, &result_map, + &got_exception)); } // Loop through `ids` and fill each entry for the `results` vector, @@ -478,7 +457,7 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m } // If no timeout was set and none of the results will throw an exception, // then check that we fetched all results before returning. - if (timeout_ms >= 0 && !will_throw_exception) { + if (timeout_ms < 0 && !will_throw_exception) { RAY_CHECK(!missing_result); } @@ -488,15 +467,13 @@ Status CoreWorker::Get(const std::vector &ids, const int64_t timeout_m Status CoreWorker::Contains(const ObjectID &object_id, bool *has_object) { bool found = false; if (object_id.IsDirectCallType()) { - // Note that the memory store returns false if the object value is - // ErrorType::OBJECT_IN_PLASMA. - found = memory_store_->Contains(object_id); - } - if (!found) { - // We check plasma as a fallback in all cases, since a direct call object - // may have been spilled to plasma. - RAY_RETURN_NOT_OK(plasma_store_provider_->Contains( - object_id.WithTransportType(TaskTransportType::RAYLET), &found)); + bool in_plasma = false; + found = memory_store_->Contains(object_id, &in_plasma); + if (in_plasma) { + RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found)); + } + } else { + RAY_RETURN_NOT_OK(plasma_store_provider_->Contains(object_id, &found)); } *has_object = found; return Status::OK(); @@ -847,9 +824,8 @@ Status CoreWorker::AllocateReturnObjects( RayConfig::instance().max_direct_call_object_size()) { data_buffer = std::make_shared(data_sizes[i]); } else { - RAY_RETURN_NOT_OK(Create( - metadatas[i], data_sizes[i], - object_ids[i].WithTransportType(TaskTransportType::RAYLET), &data_buffer)); + RAY_RETURN_NOT_OK( + Create(metadatas[i], data_sizes[i], object_ids[i], &data_buffer)); object_already_exists = !data_buffer; } } @@ -910,7 +886,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, continue; } if (return_objects->at(i)->GetData()->IsPlasmaBuffer()) { - if (!Seal(return_ids[i].WithTransportType(TaskTransportType::RAYLET)).ok()) { + if (!Seal(return_ids[i]).ok()) { RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object " << return_ids[i] << " in store: " << status.message(); } @@ -943,16 +919,23 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, args->resize(num_args); arg_reference_ids->resize(num_args); - std::vector object_ids_to_fetch; - std::vector indices; + absl::flat_hash_set by_ref_ids; + absl::flat_hash_map by_ref_indices; for (size_t i = 0; i < task.NumArgs(); ++i) { int count = task.ArgIdCount(i); if (count > 0) { // pass by reference. RAY_CHECK(count == 1); - object_ids_to_fetch.push_back(task.ArgId(i, 0)); - indices.push_back(i); + // Direct call type objects that weren't inlined have been promoted to plasma. + // We need to put an OBJECT_IN_PLASMA error here so the subsequent call to Get() + // properly redirects to the plasma store. + if (task.ArgId(i, 0).IsDirectCallType()) { + RAY_CHECK_OK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + task.ArgId(i, 0))); + } + by_ref_ids.insert(task.ArgId(i, 0)); + by_ref_indices.emplace(task.ArgId(i, 0), i); arg_reference_ids->at(i) = task.ArgId(i, 0); } else { // pass by value. @@ -971,15 +954,16 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, } } - std::vector> results; - auto status = Get(object_ids_to_fetch, -1, &results); - if (status.ok()) { - for (size_t i = 0; i < results.size(); i++) { - args->at(indices[i]) = results[i]; - } + // Fetch by-reference arguments directly from the plasma store. + bool got_exception = false; + absl::flat_hash_map> result_map; + RAY_RETURN_NOT_OK(plasma_store_provider_->Get(by_ref_ids, -1, worker_context_, + &result_map, &got_exception)); + for (const auto &it : result_map) { + args->at(by_ref_indices[it.first]) = it.second; } - return status; + return Status::OK(); } void CoreWorker::HandleAssignTask(const rpc::AssignTaskRequest &request, @@ -1077,7 +1061,7 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c memory_store_->GetAsync(object_id, [python_future, success_callback, fallback_callback, object_id](std::shared_ptr ray_object) { if (ray_object->IsInPlasmaError()) { - fallback_callback(ray_object, object_id.WithPlasmaTransportType(), python_future); + fallback_callback(ray_object, object_id, python_future); } else { success_callback(ray_object, object_id, python_future); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 9eda952c5..806742027 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -130,7 +130,7 @@ class CoreWorker { /// called on object IDs that were created randomly, e.g., /// ObjectID::FromRandom. /// - /// Postcondition: Get(object_id.WithPlasmaTransportType()) is valid. + /// Postcondition: Get(object_id) is valid. /// /// \param[in] object_id The object ID to serialize. /// \param[out] owner_id The ID of the object's owner. This should be diff --git a/src/ray/core_worker/profiling.cc b/src/ray/core_worker/profiling.cc index cb55d5ac6..acdc7c024 100644 --- a/src/ray/core_worker/profiling.cc +++ b/src/ray/core_worker/profiling.cc @@ -1,7 +1,7 @@ -#include - #include "ray/core_worker/profiling.h" +#include + namespace ray { namespace worker { diff --git a/src/ray/core_worker/profiling.h b/src/ray/core_worker/profiling.h index 913e72800..dc0c92a4f 100644 --- a/src/ray/core_worker/profiling.h +++ b/src/ray/core_worker/profiling.h @@ -4,7 +4,6 @@ #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" #include "absl/time/clock.h" - #include "ray/core_worker/context.h" #include "ray/gcs/redis_gcs_client.h" diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index b634ee4e9..78903f7b6 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -1,4 +1,5 @@ #include + #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" @@ -176,7 +177,7 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec if (!object.IsInPlasmaError()) { // Only need to promote to plasma if it wasn't already put into plasma // by the task that created the object. - store_in_plasma_(object, object_id.WithTransportType(TaskTransportType::RAYLET)); + store_in_plasma_(object, object_id); } promoted_to_plasma_.erase(promoted_it); } @@ -367,8 +368,7 @@ void CoreWorkerMemoryStore::Delete(const absl::flat_hash_set &object_i auto it = objects_.find(object_id); if (it != objects_.end()) { if (it->second->IsInPlasmaError()) { - plasma_ids_to_delete->insert( - object_id.WithTransportType(TaskTransportType::RAYLET)); + plasma_ids_to_delete->insert(object_id); } else { objects_.erase(it); } @@ -383,13 +383,17 @@ void CoreWorkerMemoryStore::Delete(const std::vector &object_ids) { } } -bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id) { +bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id, bool *in_plasma) { absl::MutexLock lock(&mu_); auto it = objects_.find(object_id); - if (it != objects_.end() && it->second->IsInPlasmaError()) { - return false; + if (it != objects_.end()) { + if (it->second->IsInPlasmaError()) { + *in_plasma = true; + return false; + } + return true; } - return it != objects_.end(); + return false; } } // namespace ray diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 09f08297f..239e1e7ce 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -104,8 +104,10 @@ class CoreWorkerMemoryStore { /// Check whether this store contains the object. /// /// \param[in] object_id The object to check. + /// \param[out] in_plasma Set to true if the object was spilled to plasma. + /// If this is set to true, Contains() will return false. /// \return Whether the store has the object. - bool Contains(const ObjectID &object_id); + bool Contains(const ObjectID &object_id, bool *in_plasma); /// Returns the number of objects in this store. /// diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 11b9f22df..482863d7c 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -1,4 +1,5 @@ #include "ray/core_worker/store_provider/plasma_store_provider.h" + #include "ray/common/ray_config.h" #include "ray/core_worker/context.h" #include "ray/core_worker/core_worker.h" @@ -48,7 +49,6 @@ Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &meta const size_t data_size, const ObjectID &object_id, std::shared_ptr *data) { - RAY_CHECK(!object_id.IsDirectCallType()); auto plasma_id = object_id.ToPlasmaId(); std::shared_ptr arrow_buffer; { diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 1be4fc4ca..f93439c36 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1,4 +1,5 @@ #include "ray/core_worker/task_manager.h" + #include "ray/util/util.h" namespace ray { diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 870a7b351..d3e09fc3f 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -4,7 +4,6 @@ #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" - #include "ray/common/id.h" #include "ray/common/task/task.h" #include "ray/core_worker/actor_manager.h" diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 3582f51d7..abce6d4d1 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -1,9 +1,9 @@ -#include "gtest/gtest.h" +#include "ray/core_worker/transport/direct_task_transport.h" +#include "gtest/gtest.h" #include "ray/common/task/task_spec.h" #include "ray/common/task/task_util.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" -#include "ray/core_worker/transport/direct_task_transport.h" #include "ray/raylet/raylet_client.h" #include "ray/rpc/worker/core_worker_client.h" #include "src/ray/util/test_util.h" @@ -168,8 +168,8 @@ TEST(LocalDependencyResolverTest, TestHandlePlasmaPromotion) { resolver.ResolveDependencies(task, [&ok]() { ok = true; }); ASSERT_TRUE(ok); ASSERT_TRUE(task.ArgByRef(0)); - // Checks that the object id was promoted to a plasma type id. - ASSERT_FALSE(task.ArgId(0, 0).IsDirectCallType()); + // Checks that the object id is still a direct call id. + ASSERT_TRUE(task.ArgId(0, 0).IsDirectCallType()); ASSERT_EQ(resolver.NumPendingTasks(), 0); } diff --git a/src/ray/core_worker/transport/dependency_resolver.cc b/src/ray/core_worker/transport/dependency_resolver.cc index b8fd15cd2..89b5237a8 100644 --- a/src/ray/core_worker/transport/dependency_resolver.cc +++ b/src/ray/core_worker/transport/dependency_resolver.cc @@ -32,8 +32,7 @@ void InlineDependencies( mutable_arg->clear_object_ids(); if (it->second->IsInPlasmaError()) { // Promote the object id to plasma. - mutable_arg->add_object_ids( - it->first.WithTransportType(TaskTransportType::RAYLET).Binary()); + mutable_arg->add_object_ids(it->first.Binary()); } else { // Inline the object value. if (it->second->HasData()) { diff --git a/src/ray/core_worker/transport/dependency_resolver.h b/src/ray/core_worker/transport/dependency_resolver.h index a2ce057ae..b30c5e4a0 100644 --- a/src/ray/core_worker/transport/dependency_resolver.h +++ b/src/ray/core_worker/transport/dependency_resolver.h @@ -21,8 +21,8 @@ class LocalDependencyResolver { // /// Note: This method **will mutate** the given TaskSpecification. /// - /// Postcondition: all direct call ids in arguments are converted to values and all - /// remaining by-reference arguments are TaskTransportType::RAYLET. + /// Postcondition: all direct call id arguments that haven't been spilled to plasma + /// are converted to values and all remaining arguments are arguments in the task spec. void ResolveDependencies(TaskSpecification &task, std::function on_complete); /// Return the number of tasks pending dependency resolution.