From 4e81804cba995fe87b0f2339a72025db1c5d7720 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 15 Jul 2020 14:55:51 -0700 Subject: [PATCH] [core] Replace task resubmission in raylet with ownership protocol (#9394) * Add intended worker ID to GetObjectStatus, tests * Remove TaskID owner_id * lint * Add owner address to task args * Make TaskArg a virtual class, remove multi args * Set owner address for task args * merge * Fix tests * Add ObjectRefs to task dependency manager, pass from task spec args * tmp * tmp * Fix * Add ownership info for task arguments * Convert WaitForDirectActorCallArgs * lint * build * update * build * java * Move code * build * Revert "Fix Google log directory again (#9063)" This reverts commit 275da2e4003b56e5c315ceae53a2e5f5ad7874c1. * Fix free * Regression tests - shorten timeouts in reconstruction unit tests * Remove timeout for non-actor tasks * Modify tests using ray.internal.free * Clean up future resolution code * Raylet polls the owner * todo * comment * Update src/ray/core_worker/core_worker.cc Co-authored-by: Edward Oakes * Drop stale actor table notifications * Fix bug where actor restart hangs * Revert buggy code for duplicate tasks * build * Fix errors for lru_evict and internal.free * Revert "Drop stale actor table notifications" This reverts commit 193c5d20e5577befd43f166e16c972e2f9247c91. * Revert "build" This reverts commit 5644edbac906ff6ef98feb40b6f62c9e63698c29. * Fix free test * Fixes for freed objects Co-authored-by: Edward Oakes --- python/ray/internal/internal_api.py | 6 +- python/ray/tests/test_advanced.py | 5 +- python/ray/tests/test_cancel.py | 6 +- python/ray/tests/test_failure.py | 4 - python/ray/tests/test_reconstruction.py | 7 + src/ray/core_worker/core_worker.cc | 63 ++++--- src/ray/core_worker/future_resolver.cc | 20 ++- src/ray/core_worker/reference_count.cc | 5 + src/ray/core_worker/reference_count.h | 6 + src/ray/core_worker/reference_count_test.cc | 7 + src/ray/core_worker/test/core_worker_test.cc | 8 +- src/ray/protobuf/core_worker.proto | 1 + src/ray/raylet/node_manager.cc | 179 ++++++++++++------- src/ray/raylet/task_dependency_manager.cc | 18 +- src/ray/raylet/task_dependency_manager.h | 11 ++ 15 files changed, 239 insertions(+), 107 deletions(-) diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index b9279610d..04c260ffa 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -30,7 +30,7 @@ def memory_summary(): def free(object_refs, local_only=False, delete_creating_tasks=False): - """Free a list of IDs from object stores. + """Free a list of IDs from the in-process and plasma object stores. This function is a low-level API which should be used in restricted scenarios. @@ -38,8 +38,8 @@ def free(object_refs, local_only=False, delete_creating_tasks=False): If local_only is false, the request will be send to all object stores. This method will not return any value to indicate whether the deletion is - successful or not. This function is an instruction to object store. If - the some of the objects are in use, object stores will delete them later + successful or not. This function is an instruction to the object store. If + some of the objects are in use, the object stores will delete them later when the ref count is down to 0. Examples: diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index b86100358..990e10700 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -33,11 +33,12 @@ def test_internal_free(shutdown_only): sampler = Sampler.remote() - # Free does not delete from in-memory store. + # Free deletes from in-memory store. obj_ref = sampler.sample.remote() ray.get(obj_ref) ray.internal.free(obj_ref) - assert ray.get(obj_ref) == [1, 2, 3, 4, 5] + with pytest.raises(Exception): + ray.get(obj_ref) # Free deletes big objects from plasma store. big_id = sampler.sample_big.remote() diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index 51d905da1..17fc14967 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -6,13 +6,15 @@ import pytest import ray from ray.exceptions import RayCancellationError, RayTaskError, \ - RayTimeoutError, RayWorkerError + RayTimeoutError, RayWorkerError, \ + UnreconstructableError from ray.test_utils import SignalActor def valid_exceptions(use_force): if use_force: - return (RayTaskError, RayCancellationError, RayWorkerError) + return (RayTaskError, RayCancellationError, RayWorkerError, + UnreconstructableError) else: return (RayTaskError, RayCancellationError) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 61f762b5c..7ec85ba20 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -821,8 +821,6 @@ def test_raylet_crash_when_get(ray_start_regular): object_ref = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) ray.internal.free(object_ref) - while ray.worker.global_worker.core_worker.object_exists(object_ref): - time.sleep(1) thread = threading.Thread(target=sleep_to_kill_raylet) thread.start() @@ -984,8 +982,6 @@ def test_eviction(ray_start_cluster): assert (isinstance(ray.get(obj), np.ndarray)) # Evict the object. ray.internal.free([obj]) - while ray.worker.global_worker.core_worker.object_exists(obj): - time.sleep(1) # ray.get throws an exception. with pytest.raises(ray.exceptions.UnreconstructableError): ray.get(obj) diff --git a/python/ray/tests/test_reconstruction.py b/python/ray/tests/test_reconstruction.py index 81e1324ed..42f76d390 100644 --- a/python/ray/tests/test_reconstruction.py +++ b/python/ray/tests/test_reconstruction.py @@ -13,6 +13,7 @@ def test_cached_object(ray_start_cluster): config = json.dumps({ "num_heartbeats_timeout": 10, "raylet_heartbeat_timeout_milliseconds": 100, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. @@ -56,6 +57,7 @@ def test_reconstruction_cached_dependency(ray_start_cluster, "raylet_heartbeat_timeout_milliseconds": 100, "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, "free_objects_period_milliseconds": -1, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. @@ -109,6 +111,7 @@ def test_basic_reconstruction(ray_start_cluster, reconstruction_enabled): "raylet_heartbeat_timeout_milliseconds": 100, "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, "free_objects_period_milliseconds": -1, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. @@ -152,6 +155,7 @@ def test_basic_reconstruction_put(ray_start_cluster, reconstruction_enabled): "raylet_heartbeat_timeout_milliseconds": 100, "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, "free_objects_period_milliseconds": -1, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. @@ -198,6 +202,7 @@ def test_multiple_downstream_tasks(ray_start_cluster, reconstruction_enabled): "raylet_heartbeat_timeout_milliseconds": 100, "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, "free_objects_period_milliseconds": -1, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. @@ -252,6 +257,7 @@ def test_reconstruction_chain(ray_start_cluster, reconstruction_enabled): "raylet_heartbeat_timeout_milliseconds": 100, "lineage_pinning_enabled": 1 if reconstruction_enabled else 0, "free_objects_period_milliseconds": -1, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. @@ -298,6 +304,7 @@ def test_reconstruction_stress(ray_start_cluster): "free_objects_period_milliseconds": -1, "max_direct_call_object_size": 100, "task_retry_delay_ms": 100, + "initial_reconstruction_timeout_milliseconds": 200, }) cluster = ray_start_cluster # Head node with no resources. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index abb9f2c14..8e9293bc7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -875,6 +875,7 @@ Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, })); } else { RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id)); + reference_counter_->FreePlasmaObjects({object_id}); } RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id)); return Status::OK(); @@ -1070,8 +1071,16 @@ Status CoreWorker::Delete(const std::vector &object_ids, bool local_on // logged and the object will not get released. reference_counter_->FreePlasmaObjects(object_ids); + // Store an error in the in-memory store to indicate that the plasma value is + // no longer reachable. + memory_store_->Delete(object_ids); + for (const auto &object_id : object_ids) { + RAY_CHECK(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), + object_id)); + } + // We only delete from plasma, which avoids hangs (issue #7105). In-memory - // objects are always handled by ref counting only. + // objects can only be deleted once the ref count goes to 0. absl::flat_hash_set plasma_object_ids(object_ids.begin(), object_ids.end()); return plasma_store_provider_->Delete(plasma_object_ids, local_only, delete_creating_tasks); @@ -1708,31 +1717,37 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques ObjectID object_id = ObjectID::FromBinary(request.object_id()); RAY_LOG(DEBUG) << "Received GetObjectStatus " << object_id; - // We own the task. Reply back to the borrower once the object has been - // created. - // TODO(swang): We could probably just send the object value if it is small - // enough and we have it local. - reply->set_status(rpc::GetObjectStatusReply::CREATED); - if (task_manager_->IsTaskPending(object_id.TaskId())) { - // Acquire a reference and retry. This prevents the object from being - // evicted out from under us before we can start the get. - AddLocalReference(object_id, ""); - if (task_manager_->IsTaskPending(object_id.TaskId())) { - // The task is pending. Send the reply once the task finishes. - memory_store_->GetAsync(object_id, - [send_reply_callback](std::shared_ptr obj) { - send_reply_callback(Status::OK(), nullptr, nullptr); - }); - RemoveLocalReference(object_id); - } else { - // We lost the race, the task is done. - RemoveLocalReference(object_id); - send_reply_callback(Status::OK(), nullptr, nullptr); - } - } else { - // The task is done. Send the reply immediately. + // Acquire a reference to the object. This prevents the object from being + // evicted out from under us while we check the object status and start the + // Get. + AddLocalReference(object_id, ""); + + rpc::Address owner_address; + auto has_owner = reference_counter_->GetOwner(object_id, &owner_address); + if (!has_owner) { + // We owned this object, but the object has gone out of scope. + reply->set_status(rpc::GetObjectStatusReply::OUT_OF_SCOPE); send_reply_callback(Status::OK(), nullptr, nullptr); + } else { + RAY_CHECK(owner_address.worker_id() == request.owner_worker_id()); + + if (reference_counter_->IsPlasmaObjectFreed(object_id)) { + reply->set_status(rpc::GetObjectStatusReply::FREED); + } else { + reply->set_status(rpc::GetObjectStatusReply::CREATED); + } + // Send the reply once the value has become available. The value is + // guaranteed to become available eventually because we own the object and + // its ref count is > 0. + // TODO(swang): We could probably just send the object value if it is small + // enough and we have it local. + memory_store_->GetAsync(object_id, + [send_reply_callback](std::shared_ptr obj) { + send_reply_callback(Status::OK(), nullptr, nullptr); + }); } + + RemoveLocalReference(object_id); } void CoreWorker::HandleWaitForActorOutOfScope( diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index 7eb6620e7..6d1317415 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -42,11 +42,21 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, RAY_LOG(WARNING) << "Error retrieving the value of object ID " << object_id << " that was deserialized: " << status.ToString(); } - // Either the owner is gone or the owner replied that the object has - // been created. In both cases, we can now try to fetch the object via - // plasma. - RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), - object_id)); + if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) { + // The owner is gone or the owner replied that the object has gone + // out of scope (this is an edge case in the distributed ref counting + // protocol where a borrower dies before it can notify the owner of + // another borrower). Store an error so that an exception will be + // thrown immediately when the worker tries to get the value. + RAY_UNUSED(in_memory_store_->Put( + RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE), object_id)); + } else { + // We can now try to fetch the object via plasma. If the owner later + // fails or the object is released, the raylet will eventually store + // an error in plasma on our behalf. + RAY_UNUSED(in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), + object_id)); + } })); } diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 5113f47cd..587fb8c64 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -368,6 +368,11 @@ std::vector ReferenceCounter::GetOwnerAddresses( return owner_addresses; } +bool ReferenceCounter::IsPlasmaObjectFreed(const ObjectID &object_id) const { + absl::MutexLock lock(&mutex_); + return freed_objects_.find(object_id) != freed_objects_.end(); +} + void ReferenceCounter::FreePlasmaObjects(const std::vector &object_ids) { absl::MutexLock lock(&mutex_); for (const ObjectID &object_id : object_ids) { diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index 155090eb9..39d09211b 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -208,6 +208,12 @@ class ReferenceCounter : public ReferenceCounterInterface { std::vector GetOwnerAddresses( const std::vector object_ids) const; + /// Check whether an object value has been freed. + /// + /// \param[in] object_id The object to check. + /// \return Whether the object value has been freed. + bool IsPlasmaObjectFreed(const ObjectID &object_id) const; + /// Release the underlying value from plasma (if any) for these objects. /// /// \param[in] object_ids The IDs whose values to free. diff --git a/src/ray/core_worker/reference_count_test.cc b/src/ray/core_worker/reference_count_test.cc index a38d4e161..8f29456a1 100644 --- a/src/ray/core_worker/reference_count_test.cc +++ b/src/ray/core_worker/reference_count_test.cc @@ -2028,26 +2028,33 @@ TEST_F(ReferenceCountTest, TestFree) { // Test free before receiving information about where the object is pinned. rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); + ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); rc->AddLocalReference(id, ""); rc->FreePlasmaObjects({id}); + ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); ASSERT_FALSE(rc->SetDeleteCallback(id, callback)); ASSERT_EQ(deleted->count(id), 0); rc->UpdateObjectPinnedAtRaylet(id, node_id); bool pinned = true; ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); ASSERT_FALSE(pinned); + ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); rc->RemoveLocalReference(id, nullptr); + ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); // Test free after receiving information about where the object is pinned. rc->AddOwnedObject(id, {}, rpc::Address(), "", 0, true); rc->AddLocalReference(id, ""); ASSERT_TRUE(rc->SetDeleteCallback(id, callback)); rc->UpdateObjectPinnedAtRaylet(id, node_id); + ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); rc->FreePlasmaObjects({id}); + ASSERT_TRUE(rc->IsPlasmaObjectFreed(id)); ASSERT_TRUE(deleted->count(id) > 0); ASSERT_TRUE(rc->IsPlasmaObjectPinned(id, &pinned)); ASSERT_FALSE(pinned); rc->RemoveLocalReference(id, nullptr); + ASSERT_FALSE(rc->IsPlasmaObjectFreed(id)); } } // namespace ray diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index b4727cf17..18ac0a6c1 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -822,12 +822,12 @@ TEST_F(SingleNodeTest, TestObjectInterface) { // wait for objects being deleted, so wait a while for plasma store // to process the command. usleep(200 * 1000); - ASSERT_TRUE(core_worker.Get(ids, 0, &results).IsTimedOut()); + ASSERT_TRUE(core_worker.Get(ids, 0, &results).ok()); // Since array2 has been deleted from the plasma store, the Get should - // timeout and return nullptr for all results. + // return UnreconstructableError for all results. ASSERT_EQ(results.size(), 2); - ASSERT_TRUE(!results[0]); - ASSERT_TRUE(!results[1]); + ASSERT_TRUE(results[0]->IsException()); + ASSERT_TRUE(results[1]->IsException()); } TEST_F(SingleNodeTest, TestNormalTaskLocal) { diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 655ae7642..a6e93aad5 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -154,6 +154,7 @@ message GetObjectStatusReply { enum ObjectStatus { CREATED = 0; OUT_OF_SCOPE = 1; + FREED = 2; } ObjectStatus status = 1; } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0c867e971..8c64e32a7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2257,6 +2257,7 @@ void NodeManager::MarkObjectsAsFailed(const ErrorType &error_type, status = store_client_.Seal(object_id); } if (!status.ok() && !status.IsObjectExists()) { + RAY_LOG(INFO) << "Marking plasma object failed " << object_id; // If we failed to save the error code, log a warning and push an error message // to the driver. std::ostringstream stream; @@ -2759,7 +2760,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { } } else { // (See design_docs/task_states.rst for the state transition diagram.) - RAY_CHECK(local_queues_.RemoveTask(task_id, &task)); + RAY_CHECK(local_queues_.RemoveTask(task_id, &task)) << task_id; // Release task's resources. The worker's lifetime resources are still held. auto const &task_resources = worker.GetTaskResourceIds(); @@ -3022,76 +3023,130 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id void NodeManager::HandleTaskReconstruction(const TaskID &task_id, const ObjectID &required_object_id) { - // Retrieve the task spec in order to re-execute the task. - RAY_CHECK_OK(gcs_client_->Tasks().AsyncGet( - task_id, - /*callback=*/ - [this, required_object_id, task_id]( - Status status, const boost::optional &task_data) { - if (task_data) { - // The task was in the GCS task table. Use the stored task spec to - // re-execute the task. - ResubmitTask(Task(task_data->task()), required_object_id); - return; - } - // The task was not in the GCS task table. It must therefore be in the - // lineage cache. - if (lineage_cache_.ContainsTask(task_id)) { - // Use a copy of the cached task spec to re-execute the task. - const Task task = lineage_cache_.GetTaskOrDie(task_id); - ResubmitTask(task, required_object_id); - } else { - RAY_LOG(WARNING) - << "Metadata of task " << task_id - << " not found in either GCS or lineage cache. It may have been evicted " - << "by the redis LRU configuration. Consider increasing the memory " - "allocation via " - << "ray.init(redis_max_memory=)."; - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, - JobID::Nil()); - } - })); + // Get the owner's address. + rpc::Address owner_addr; + bool has_owner = + task_dependency_manager_.GetOwnerAddress(required_object_id, &owner_addr); + if (has_owner) { + if (!RayConfig::instance().object_pinning_enabled()) { + // LRU eviction is enabled. The object may still be in scope, but we + // weren't able to fetch the value within the timeout, so the value has + // most likely been evicted. Mark the object as unreachable. + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, + JobID::Nil()); + } else { + RAY_LOG(DEBUG) << "Required object " << required_object_id + << " fetch timed out, asking owner " + << WorkerID::FromBinary(owner_addr.worker_id()); + // The owner's address exists. Poll the owner to check if the object is + // still in scope. If not, mark the object as failed. + // TODO(swang): If the owner has died, we could also mark the object as + // failed as soon as we hear about the owner's failure from the GCS, + // avoiding the raylet's reconstruction timeout. + auto client = std::unique_ptr( + new rpc::CoreWorkerClient(owner_addr, client_call_manager_)); + + rpc::GetObjectStatusRequest request; + request.set_object_id(required_object_id.Binary()); + request.set_owner_worker_id(owner_addr.worker_id()); + RAY_CHECK_OK(client->GetObjectStatus( + request, [this, required_object_id](Status status, + const rpc::GetObjectStatusReply &reply) { + if (!status.ok() || + reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE || + reply.status() == rpc::GetObjectStatusReply::FREED) { + // The owner is gone, or the owner replied that the object has + // gone out of scope (this is an edge case in the distributed ref + // counting protocol where a borrower dies before it can notify + // the owner of another borrower), or the object value has been + // freed. Store an error in the local plasma store so that an + // exception will be thrown when the worker tries to get the + // value. + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, + {required_object_id}, JobID::Nil()); + } + // Do nothing if the owner replied that the object is available. The + // object manager will continue trying to fetch the object, and this + // handler will get triggered again if the object is still + // unavailable after another timeout. + })); + } + } else { + // We do not have the owner's address. This is either an actor creation + // task or a randomly generated ObjectID. Try to look up the spec for the + // actor creation task. + // TODO(swang): The task lookup is only needed when the GCS actor service is + // disabled. Once the GCS actor service is enabled by default, we can + // immediately mark the object as failed if there is no ownership + // information. + RAY_LOG(DEBUG) << "Required object " << required_object_id + << " fetch timed out, checking task table"; + RAY_CHECK_OK( + gcs_client_->Tasks().AsyncGet( + task_id, + /*callback=*/ + [this, required_object_id, task_id]( + Status status, const boost::optional &task_data) { + if (task_data) { + // The task was in the GCS task table. Use the stored task spec to + // re-execute the task. + ResubmitTask(Task(task_data->task()), required_object_id); + return; + } + // The task was not in the GCS task table. It must therefore be in the + // lineage cache. + if (lineage_cache_.ContainsTask(task_id)) { + // Use a copy of the cached task spec to re-execute the task. + const Task task = lineage_cache_.GetTaskOrDie(task_id); + ResubmitTask(task, required_object_id); + } else { + // No actor creation task spec was found. This is most likely a + // randomly generated ObjectID whose value is unreachable. Mark the + // object as failed. + RAY_LOG(WARNING) + << "Ray cannot get the value of ObjectIDs that are generated " + "randomly (ObjectID.from_random()) or out-of-band " + "(ObjectID.from_binary(...)) because Ray " + "does not know which task will create them. " + "If this was not how your object ID was generated, please file an " + "issue " + "at https://github.com/ray-project/ray/issues/"; + MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, + {required_object_id}, JobID::Nil()); + } + })); + } } void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object_id) { RAY_LOG(DEBUG) << "Attempting to resubmit task " << task.GetTaskSpecification().TaskId(); - // Actors should only be recreated if the first initialization failed or if - // the most recent instance of the actor failed. - if (task.GetTaskSpecification().IsActorCreationTask()) { - const auto &actor_id = task.GetTaskSpecification().ActorCreationId(); - const auto it = actor_registry_.find(actor_id); - if (it != actor_registry_.end() && it->second.GetState() == ActorTableData::ALIVE) { - // If the actor is still alive, then do not resubmit the task. If the - // actor actually is dead and a result is needed, then reconstruction - // for this task will be triggered again. - RAY_LOG(WARNING) - << "Actor creation task resubmitted, but the actor is still alive."; - return; - } - } - - // Driver tasks cannot be reconstructed. If this is a driver task, push an - // error to the driver and do not resubmit it. - if (task.GetTaskSpecification().IsDriverTask()) { - // TODO(rkn): Define this constant somewhere else. - std::string type = "put_reconstruction"; - std::ostringstream error_message; - error_message << "The task with ID " << task.GetTaskSpecification().TaskId() - << " is a driver task and so the object created by ray.put " - << "could not be reconstructed."; - auto error_data_ptr = - gcs::CreateErrorTableData(type, error_message.str(), current_time_ms(), - task.GetTaskSpecification().JobId()); - RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); - MarkObjectsAsFailed(ErrorType::OBJECT_UNRECONSTRUCTABLE, {required_object_id}, - task.GetTaskSpecification().JobId()); + // All failure handling is handled by the owner, except for actor creation + // tasks. + if (!task.GetTaskSpecification().IsActorCreationTask()) { return; } - RAY_LOG(INFO) << "Resubmitting task " << task.GetTaskSpecification().TaskId() - << " on node " << self_node_id_; + // When the GCS is disabled, the raylet is responsible for restarting the actor. + if (RayConfig::instance().gcs_actor_service_enabled()) { + return; + } + + // Actors should only be recreated if the first initialization failed or if + // the most recent instance of the actor failed. + const auto &actor_id = task.GetTaskSpecification().ActorCreationId(); + const auto it = actor_registry_.find(actor_id); + if (it != actor_registry_.end() && it->second.GetState() == ActorTableData::ALIVE) { + // If the actor is still alive, then do not resubmit the task. If the + // actor actually is dead and a result is needed, then reconstruction + // for this task will be triggered again. + RAY_LOG(WARNING) << "Actor creation task resubmitted, but the actor is still alive."; + return; + } + + RAY_LOG(INFO) << "Resubmitting actor creation task " + << task.GetTaskSpecification().TaskId() << " on node " << self_node_id_; // The task may be reconstructed. Submit it with an empty lineage, since any // uncommitted lineage must already be in the lineage cache. At this point, // the task should not yet exist in the local scheduling queue. If it does, diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index ab6c1f26e..0a1fe7fc2 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -92,7 +92,7 @@ std::vector TaskDependencyManager::HandleObjectLocal( const ray::ObjectID &object_id) { // Add the object to the table of locally available objects. auto inserted = local_objects_.insert(object_id); - RAY_CHECK(inserted.second); + RAY_CHECK(inserted.second) << object_id; // Find all tasks and workers that depend on the newly available object. std::vector ready_task_ids; @@ -513,6 +513,22 @@ void TaskDependencyManager::RecordMetrics() const { pending_tasks_.size(), {{stats::ValueTypeKey, "num_pending_tasks"}}); } +bool TaskDependencyManager::GetOwnerAddress(const ObjectID &object_id, + rpc::Address *owner_address) const { + const auto creating_task_entry = required_tasks_.find(object_id.TaskId()); + if (creating_task_entry == required_tasks_.end()) { + return false; + } + + const auto it = creating_task_entry->second.find(object_id); + if (it == creating_task_entry->second.end()) { + return false; + } + + *owner_address = it->second.owner_address; + return !owner_address->worker_id().empty(); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 85cc7e56b..33999700f 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -164,6 +164,17 @@ class TaskDependencyManager { /// Record metrics. void RecordMetrics() const; + /// Get the address of the owner of this object. An address will only be + /// returned if the caller previously specified that this object is required + /// on this node, through a call to SubscribeGetDependencies or + /// SubscribeWaitDependencies. + /// + /// \param[in] object_id The object whose owner to get. + /// \param[out] owner_address The address of the object's owner, if + /// available. + /// \return True if we have owner information for the object. + bool GetOwnerAddress(const ObjectID &object_id, rpc::Address *owner_address) const; + private: struct ObjectDependencies { ObjectDependencies(const rpc::ObjectReference &ref)