From d2662fecead72f8dd123b36b31a3b944bb9631df Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Sat, 23 Nov 2019 15:05:49 -0800 Subject: [PATCH] Miscellaneous bug fixes to throw unreconstructable errors for direct calls (#6245) * Test cases * Fix InPlasmaError * raylet fixes to force errors for direct calls * Disable lineage logging and task pending checks for direct calls * move todo * Clean up tests * Fix bugs in object store for Contains and Delete * Use direct call in tests * Fixes, separate actor creation direct call from normal direct call spec --- python/ray/tests/test_failure.py | 104 ++++++++++++++++++ src/ray/common/ray_object.cc | 4 +- src/ray/common/ray_object.h | 4 +- src/ray/common/task/task_spec.cc | 6 +- src/ray/common/task/task_spec.h | 2 + src/ray/core_worker/context.cc | 2 +- src/ray/core_worker/core_worker.cc | 2 +- .../memory_store/memory_store.cc | 22 +++- .../memory_store/memory_store.h | 10 +- .../store_provider/plasma_store_provider.cc | 4 + src/ray/core_worker/test/core_worker_test.cc | 4 +- src/ray/raylet/lineage_cache.cc | 7 ++ src/ray/raylet/node_manager.cc | 7 +- src/ray/raylet/task_dependency_manager.cc | 5 + 14 files changed, 169 insertions(+), 14 deletions(-) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 804281e95..bc5c38e98 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -799,3 +799,107 @@ def test_fill_object_store_exception(ray_start_cluster_head): with pytest.raises(ray.exceptions.ObjectStoreFullError): ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) + + +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_nodes": 1, + "num_cpus": 2, + }, { + "num_nodes": 2, + "num_cpus": 1, + }], + indirect=True) +def test_direct_call_eviction(ray_start_cluster): + @ray.remote + def large_object(): + return np.zeros(10 * 1024 * 1024) + + large_object = large_object.options(is_direct_call=True) + + obj = large_object.remote() + assert (isinstance(ray.get(obj), np.ndarray)) + # Evict the object. + ray.internal.free([obj]) + while ray.worker.global_worker.core_worker.object_exists(obj): + time.sleep(1) + # ray.get throws an exception. + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(obj) + + @ray.remote + def dependent_task(x): + return + + dependent_task = dependent_task.options(is_direct_call=True) + + # If the object is passed by reference, the task throws an + # exception. + with pytest.raises(ray.exceptions.RayTaskError): + ray.get(dependent_task.remote(obj)) + + +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_nodes": 1, + "num_cpus": 2, + }, { + "num_nodes": 2, + "num_cpus": 1, + }], + indirect=True) +def test_direct_call_serialized_id_eviction(ray_start_cluster): + @ray.remote + def large_object(): + return np.zeros(10 * 1024 * 1024) + + @ray.remote + def get(obj_ids): + print("get", obj_ids) + obj_id = obj_ids[0] + assert (isinstance(ray.get(obj_id), np.ndarray)) + # Evict the object. + ray.internal.free(obj_ids) + while ray.worker.global_worker.core_worker.object_exists(obj_id): + time.sleep(1) + with pytest.raises(ray.exceptions.UnreconstructableError): + ray.get(obj_id) + print("get done", obj_ids) + + large_object = large_object.options(is_direct_call=True) + get = get.options(is_direct_call=True) + + obj = large_object.remote() + ray.get(get.remote([obj])) + + +@pytest.mark.skip( + "Uncomment once eviction errors for serialized IDs are implemented") +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "num_nodes": 2, + "num_cpus": 10, + }, { + "num_nodes": 1, + "num_cpus": 20, + }], + indirect=True) +def test_direct_call_serialized_id(ray_start_cluster): + @ray.remote + def small_object(): + # Sleep a bit before creating the object to force a timeout + # at the getter. + time.sleep(1) + return 1 + + @ray.remote + def get(obj_ids): + print("get", obj_ids) + obj_id = obj_ids[0] + assert ray.get(obj_id) == 1 + + small_object = small_object.options(is_direct_call=True) + get = get.options(is_direct_call=True) + + obj = small_object.remote() + ray.get(get.remote([obj])) diff --git a/src/ray/common/ray_object.cc b/src/ray/common/ray_object.cc index cf5c5c674..76e73b4da 100644 --- a/src/ray/common/ray_object.cc +++ b/src/ray/common/ray_object.cc @@ -2,7 +2,7 @@ namespace ray { -bool RayObject::IsException() { +bool RayObject::IsException() const { if (metadata_ == nullptr) { return false; } @@ -19,7 +19,7 @@ bool RayObject::IsException() { return false; } -bool RayObject::IsInPlasmaError() { +bool RayObject::IsInPlasmaError() const { if (metadata_ == nullptr) { return false; } diff --git a/src/ray/common/ray_object.h b/src/ray/common/ray_object.h index 72c44e672..c7d9d73a6 100644 --- a/src/ray/common/ray_object.h +++ b/src/ray/common/ray_object.h @@ -62,11 +62,11 @@ class RayObject { bool HasMetadata() const { return metadata_ != nullptr; } /// Whether the object represents an exception. - bool IsException(); + bool IsException() const; /// Whether the object has been promoted to plasma (i.e., since it was too /// large to return directly as part of a gRPC response). - bool IsInPlasmaError(); + bool IsInPlasmaError() const; private: std::shared_ptr data_; diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 57f9ff31e..9f2fc0aa3 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -186,11 +186,13 @@ ObjectID TaskSpecification::ActorDummyObject() const { return ReturnId(NumReturns() - 1, TaskTransportType::RAYLET); } -bool TaskSpecification::IsDirectCall() const { +bool TaskSpecification::IsDirectCall() const { return message_->is_direct_call(); } + +bool TaskSpecification::IsDirectActorCreationCall() const { if (IsActorCreationTask()) { return message_->actor_creation_task_spec().is_direct_call(); } else { - return message_->is_direct_call(); + return false; } } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 553bfc082..51dc52d06 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -148,6 +148,8 @@ class TaskSpecification : public MessageWrapper { bool IsDirectCall() const; + bool IsDirectActorCreationCall() const; + int MaxActorConcurrency() const; bool IsAsyncioActor() const; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index d8f745530..37293bddc 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -96,7 +96,7 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { SetCurrentJobId(task_spec.JobId()); RAY_CHECK(current_actor_id_.IsNil()); current_actor_id_ = task_spec.ActorCreationId(); - current_actor_is_direct_call_ = task_spec.IsDirectCall(); + current_actor_is_direct_call_ = task_spec.IsDirectActorCreationCall(); current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); current_actor_is_asyncio_ = task_spec.IsAsyncioActor(); } else if (task_spec.IsActorTask()) { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6653b51d6..5c6eba0fb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -515,9 +515,9 @@ Status CoreWorker::Delete(const std::vector &object_ids, bool local_on absl::flat_hash_set memory_object_ids; GroupObjectIdsByStoreProvider(object_ids, &plasma_object_ids, &memory_object_ids); + memory_store_->Delete(memory_object_ids, &plasma_object_ids); RAY_RETURN_NOT_OK(plasma_store_provider_->Delete(plasma_object_ids, local_only, delete_creating_tasks)); - memory_store_->Delete(memory_object_ids); return Status::OK(); } diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 575d03de2..ab6dfda9e 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -173,7 +173,11 @@ Status CoreWorkerMemoryStore::Put(const RayObject &object, const ObjectID &objec auto promoted_it = promoted_to_plasma_.find(object_id); if (promoted_it != promoted_to_plasma_.end()) { RAY_CHECK(store_in_plasma_ != nullptr); - store_in_plasma_(object, object_id.WithTransportType(TaskTransportType::RAYLET)); + if (!object.IsInPlasmaError()) { + // Only need to promote to plasma if it wasn't already put into plasma + // by the task that created the object. + store_in_plasma_(object, object_id.WithTransportType(TaskTransportType::RAYLET)); + } promoted_to_plasma_.erase(promoted_it); } @@ -358,10 +362,19 @@ Status CoreWorkerMemoryStore::Wait(const absl::flat_hash_set &object_i return Status::OK(); } -void CoreWorkerMemoryStore::Delete(const absl::flat_hash_set &object_ids) { +void CoreWorkerMemoryStore::Delete(const absl::flat_hash_set &object_ids, + absl::flat_hash_set *plasma_ids_to_delete) { absl::MutexLock lock(&mu_); for (const auto &object_id : object_ids) { - objects_.erase(object_id); + auto it = objects_.find(object_id); + if (it != objects_.end()) { + if (it->second->IsInPlasmaError()) { + plasma_ids_to_delete->insert( + object_id.WithTransportType(TaskTransportType::RAYLET)); + } else { + objects_.erase(it); + } + } } } @@ -375,6 +388,9 @@ void CoreWorkerMemoryStore::Delete(const std::vector &object_ids) { bool CoreWorkerMemoryStore::Contains(const ObjectID &object_id) { absl::MutexLock lock(&mu_); auto it = objects_.find(object_id); + if (it != objects_.end() && it->second->IsInPlasmaError()) { + return false; + } return it != objects_.end(); } diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.h b/src/ray/core_worker/store_provider/memory_store/memory_store.h index 71c1bed03..d847d5bfa 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.h +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.h @@ -82,10 +82,18 @@ class CoreWorkerMemoryStore { std::shared_ptr GetOrPromoteToPlasma(const ObjectID &object_id); /// Delete a list of objects from the object store. + /// NOTE(swang): Objects that contain IsInPlasmaError will not be + /// deleted from the in-memory store. Instead, any future Get + /// calls should check with plasma to see whether the object has + /// been deleted. /// /// \param[in] object_ids IDs of the objects to delete. + /// \param[out] plasma_ids_to_delete This will be extended to + /// include the IDs of the plasma objects to delete, based on the + /// in-memory objects that contained InPlasmaError. /// \return Void. - void Delete(const absl::flat_hash_set &object_ids); + void Delete(const absl::flat_hash_set &object_ids, + absl::flat_hash_set *plasma_ids_to_delete); /// Delete a list of objects from the object store. /// diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index 611856990..c2986ec67 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -27,6 +27,7 @@ Status CoreWorkerPlasmaStoreProvider::SetClientOptions(std::string name, Status CoreWorkerPlasmaStoreProvider::Put(const RayObject &object, const ObjectID &object_id) { + RAY_CHECK(!object.IsInPlasmaError()) << object_id; std::shared_ptr data; RAY_RETURN_NOT_OK(Create(object.GetMetadata(), object.HasData() ? object.GetData()->Size() : 0, object_id, @@ -178,6 +179,9 @@ Status CoreWorkerPlasmaStoreProvider::Get( } size_t previous_size = remaining.size(); + // TODO: For direct calls, use NotifyDirectCallTaskBlocked/Unblocked calls + // for missing objects instead of going through the normal fetch-and-get + // codepath. RAY_RETURN_NOT_OK(FetchAndGetFromPlasmaStore(remaining, batch_ids, batch_timeout, /*fetch_only=*/false, task_id, results, got_exception)); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 37b3962bc..5e2d43179 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -686,7 +686,9 @@ TEST_F(SingleNodeTest, TestMemoryStoreProvider) { // clear the reference held. results.clear(); - provider.Delete(ids_set); + absl::flat_hash_set plasma_object_ids; + provider.Delete(ids_set, &plasma_object_ids); + ASSERT_TRUE(plasma_object_ids.empty()); usleep(200 * 1000); ASSERT_TRUE(provider.Get(ids_set, 0, ctx, &results, &got_exception).IsTimedOut()); diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index 30d0d2742..863761440 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -167,6 +167,9 @@ void LineageCache::AddUncommittedLineage(const TaskID &task_id, auto entry = uncommitted_lineage.GetEntry(task_id); if (!entry) { return; + } else if (entry->TaskData().GetTaskSpecification().IsDirectCall()) { + // Disable lineage logging for direct tasks. + return; } RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED); @@ -184,6 +187,10 @@ void LineageCache::AddUncommittedLineage(const TaskID &task_id, } bool LineageCache::CommitTask(const Task &task) { + if (task.GetTaskSpecification().IsDirectCall()) { + // Disable lineage logging for direct tasks. + return true; + } const TaskID task_id = task.GetTaskSpecification().TaskId(); RAY_LOG(DEBUG) << "Committing task " << task_id << " on " << client_id_; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9a70aa817..566b96bd4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2181,7 +2181,7 @@ std::shared_ptr NodeManager::CreateActorTableDataFromCreationTas // This is the first time that the actor has been created, so the number // of remaining reconstructions is the max. actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions()); - actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall()); + actor_info_ptr->set_is_direct_call(task_spec.IsDirectActorCreationCall()); actor_info_ptr->set_is_detached(task_spec.IsDetachedActor()); actor_info_ptr->mutable_owner_address()->CopyFrom( task_spec.GetMessage().caller_address()); @@ -2414,6 +2414,11 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object RAY_LOG(DEBUG) << "Attempting to resubmit task " << task.GetTaskSpecification().TaskId(); + if (task.GetTaskSpecification().IsDirectCall()) { + TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE); + return; + } + // Actors should only be recreated if the first initialization failed or if // the most recent instance of the actor failed. if (task.GetTaskSpecification().IsActorCreationTask()) { diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index 83137ef15..b77b93244 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -309,6 +309,11 @@ std::vector TaskDependencyManager::GetPendingTasks() const { } void TaskDependencyManager::TaskPending(const Task &task) { + // Direct tasks are not tracked by the raylet. + if (task.GetTaskSpecification().IsDirectCall()) { + return; + } + TaskID task_id = task.GetTaskSpecification().TaskId(); RAY_LOG(DEBUG) << "Task execution " << task_id << " pending";