From 2be1ccbd8fb0abd3045ba8487c2d356ce9f582d2 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 27 Jul 2018 16:53:30 -0700 Subject: [PATCH] Raise application-level exceptions for some failure scenarios. (#2429) * Raise application level exception for actor methods that can't be executed and failed tasks. * Retry task forwarding for actor tasks. * Small cleanups * Move constant to ray_config. * Create ForwardTaskOrResubmit method. * Minor * Clean up queued tasks for dead actors. * Some cleanups. * Linting * Notify task_dependency_manager_ about failed tasks. * Manage timer lifetime better. * Use smart pointers to deallocate the timer. * Fix * add comment --- src/common/state/ray_config.h | 9 ++ src/ray/object_manager/object_manager.cc | 14 +- src/ray/object_manager/object_manager.h | 6 +- .../test/object_manager_stress_test.cc | 8 +- .../test/object_manager_test.cc | 8 +- src/ray/raylet/actor_registration.cc | 5 + src/ray/raylet/actor_registration.h | 15 +- src/ray/raylet/main.cc | 1 + src/ray/raylet/node_manager.cc | 153 ++++++++++++++++-- src/ray/raylet/node_manager.h | 43 +++++ .../raylet/object_manager_integration_test.cc | 2 +- test/failure_test.py | 9 -- 12 files changed, 229 insertions(+), 44 deletions(-) diff --git a/src/common/state/ray_config.h b/src/common/state/ray_config.h index 5253f79fb..df2f12554 100644 --- a/src/common/state/ray_config.h +++ b/src/common/state/ray_config.h @@ -88,6 +88,10 @@ class RayConfig { return actor_creation_num_spillbacks_warning_; } + int node_manager_forward_task_retry_timeout_milliseconds() const { + return node_manager_forward_task_retry_timeout_milliseconds_; + } + int object_manager_pull_timeout_ms() const { return object_manager_pull_timeout_ms_; } @@ -131,6 +135,7 @@ class RayConfig { L3_cache_size_bytes_(100000000), max_tasks_to_spillback_(10), actor_creation_num_spillbacks_warning_(100), + node_manager_forward_task_retry_timeout_milliseconds_(1000), // TODO: Setting this to large values results in latency, which needs to // be addressed. This timeout is often on the critical path for object // transfers. @@ -226,6 +231,10 @@ class RayConfig { /// a value of 100 corresponds to a warning every 10 seconds. int64_t actor_creation_num_spillbacks_warning_; + /// If a node manager attempts to forward a task to another node manager and + /// the forward fails, then it will resubmit the task after this duration. + int64_t node_manager_forward_task_retry_timeout_milliseconds_; + /// Timeout, in milliseconds, to wait before retrying a failed pull in the /// ObjectManager. int object_manager_pull_timeout_ms_; diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 4f1568cdd..493461f49 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -96,8 +96,7 @@ void ObjectManager::NotifyDirectoryObjectAdd(const ObjectInfoT &object_info) { if (iter != unfulfilled_push_requests_.end()) { for (auto &pair : iter->second) { auto &client_id = pair.first; - main_service_->post( - [this, object_id, client_id]() { RAY_CHECK_OK(Push(object_id, client_id)); }); + main_service_->post([this, object_id, client_id]() { Push(object_id, client_id); }); // When push timeout is set to -1, there will be an empty timer in pair.second. if (pair.second != nullptr) { pair.second->cancel(); @@ -226,7 +225,7 @@ void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id, } } -ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { +void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { if (local_objects_.count(object_id) == 0) { // Avoid setting duplicated timer for the same object and client pair. auto &clients = unfulfilled_push_requests_[object_id]; @@ -256,12 +255,12 @@ ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &clien clients.emplace(client_id, std::move(timer)); } } - return ray::Status::OK(); + return; } // TODO(hme): Cache this data in ObjectDirectory. // Okay for now since the GCS client caches this data. - Status status = object_directory_->GetInformation( + RAY_CHECK_OK(object_directory_->GetInformation( client_id, [this, object_id, client_id](const RemoteConnectionInfo &info) { const ObjectInfoT &object_info = local_objects_[object_id]; @@ -279,8 +278,7 @@ ray::Status ObjectManager::Push(const ObjectID &object_id, const ClientID &clien }, [](const Status &status) { // Push is best effort, so do nothing here. - }); - return status; + })); } void ObjectManager::ExecuteSendObject(const ClientID &client_id, @@ -595,7 +593,7 @@ void ObjectManager::ReceivePullRequest(std::shared_ptr &con auto pr = flatbuffers::GetRoot(message); ObjectID object_id = ObjectID::from_binary(pr->object_id()->str()); ClientID client_id = ClientID::from_binary(pr->client_id()->str()); - ray::Status push_status = Push(object_id, client_id); + Push(object_id, client_id); conn->ProcessMessages(); } diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index c58b6bd85..1ccd2454e 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -38,7 +38,7 @@ struct ObjectManagerConfig { int max_receives; /// Object chunk size, in bytes uint64_t object_chunk_size; - /// The stored socked name. + /// The store socket name. std::string store_socket_name; /// The time in milliseconds to wait until a Push request /// fails due to unsatisfied local object. Special value: @@ -101,8 +101,8 @@ class ObjectManager : public ObjectManagerInterface { /// /// \param object_id The object's object id. /// \param client_id The remote node's client id. - /// \return Status of whether the push request successfully initiated. - ray::Status Push(const ObjectID &object_id, const ClientID &client_id); + /// \return Void. + void Push(const ObjectID &object_id, const ClientID &client_id); /// Pull an object from ClientID. Returns UniqueID asociated with /// an invocation of this method. diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index dad617df4..9e6e91bc0 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -362,21 +362,21 @@ class StressTestObjectManager : public TestObjectManagerBase { case TransferPattern::PUSH_A_B: { for (int i = -1; ++i < num_trials;) { ObjectID oid1 = WriteDataToClient(client1, data_size); - status = server1->object_manager_.Push(oid1, client_id_2); + server1->object_manager_.Push(oid1, client_id_2); } } break; case TransferPattern::PUSH_B_A: { for (int i = -1; ++i < num_trials;) { ObjectID oid2 = WriteDataToClient(client2, data_size); - status = server2->object_manager_.Push(oid2, client_id_1); + server2->object_manager_.Push(oid2, client_id_1); } } break; case TransferPattern::BIDIRECTIONAL_PUSH: { for (int i = -1; ++i < num_trials;) { ObjectID oid1 = WriteDataToClient(client1, data_size); - status = server1->object_manager_.Push(oid1, client_id_2); + server1->object_manager_.Push(oid1, client_id_2); ObjectID oid2 = WriteDataToClient(client2, data_size); - status = server2->object_manager_.Push(oid2, client_id_1); + server2->object_manager_.Push(oid2, client_id_1); } } break; case TransferPattern::PULL_A_B: { diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index efb6a1235..c411c00ab 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -246,14 +246,14 @@ class TestObjectManager : public TestObjectManagerBase { // dummy_id is not local. The push function will timeout. ObjectID dummy_id = ObjectID::from_random(); - status = server1->object_manager_.Push( - dummy_id, gcs_client_2->client_table().GetLocalClientId()); + server1->object_manager_.Push(dummy_id, + gcs_client_2->client_table().GetLocalClientId()); created_object_id1 = ObjectID::from_random(); WriteDataToClient(client1, data_size, created_object_id1); // Server1 holds Object1 so this Push call will success. - status = server1->object_manager_.Push( - created_object_id1, gcs_client_2->client_table().GetLocalClientId()); + server1->object_manager_.Push(created_object_id1, + gcs_client_2->client_table().GetLocalClientId()); // This timer is used to guarantee that the Push function for dummy_id will timeout. timer.reset(new boost::asio::deadline_timer(main_service)); diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index efb5699e6..ced923609 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -8,6 +8,7 @@ namespace raylet { ActorRegistration::ActorRegistration(const ActorTableDataT &actor_table_data) : actor_table_data_(actor_table_data), + alive_(true), execution_dependency_(ObjectID::nil()), frontier_() {} @@ -36,6 +37,10 @@ void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, execution_dependency_ = execution_dependency; } +bool ActorRegistration::IsAlive() const { return alive_; } + +void ActorRegistration::MarkDead() { alive_ = false; } + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index e5721b686..bd16e4f46 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -14,7 +14,8 @@ namespace raylet { /// /// Information about an actor registered in the system. This includes the /// actor's current node manager location, and if local, information about its -/// current execution state, used for reconstruction purposes. +/// current execution state, used for reconstruction purposes, and whether the +/// actor is currently alive or not. class ActorRegistration { public: /// Create an actor registration. @@ -74,10 +75,22 @@ class ActorRegistration { void ExtendFrontier(const ActorHandleID &handle_id, const ObjectID &execution_dependency); + /// Return whether the actor is alive or not. This should only be called on + /// local actors. + /// + /// \return True if the local actor is alive and false if it is dead. + bool IsAlive() const; + + /// Mark the actor as dead. + /// \return Void. + void MarkDead(); + private: /// Information from the global actor table about this actor, including the /// node manager location. ActorTableDataT actor_table_data_; + /// True if the actor is alive and false otherwise. + bool alive_; /// The object representing the state following the actor's most recently /// executed task. The next task to execute on the actor should be marked as /// execution-dependent on this object. diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 67d30987e..d3cdef460 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -48,6 +48,7 @@ int main(int argc, char *argv[]) { node_manager_config.heartbeat_period_ms = RayConfig::instance().heartbeat_timeout_milliseconds(); node_manager_config.max_lineage_size = RayConfig::instance().max_lineage_size(); + node_manager_config.store_socket_name = store_socket_name; // Configuration for the object manager. ray::ObjectManagerConfig object_manager_config; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c769d3329..b4d225863 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -111,6 +111,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); + + ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str(), "", 0)); } ray::Status NodeManager::RegisterGcs() { @@ -359,6 +361,38 @@ void NodeManager::HandleActorCreation(const ActorID &actor_id, } } +void NodeManager::GetActorTasksFromList(const ActorID &actor_id, + const std::list &tasks, + std::unordered_set &tasks_to_remove) { + for (auto const &task : tasks) { + auto const &spec = task.GetTaskSpecification(); + if (actor_id == spec.ActorId()) { + tasks_to_remove.insert(spec.TaskId()); + } + } +} + +void NodeManager::CleanUpTasksForDeadActor(const ActorID &actor_id) { + // TODO(rkn): The code below should be cleaned up when we improve the + // SchedulingQueue API. + std::unordered_set tasks_to_remove; + + GetActorTasksFromList(actor_id, local_queues_.GetUncreatedActorMethods(), + tasks_to_remove); + GetActorTasksFromList(actor_id, local_queues_.GetWaitingTasks(), tasks_to_remove); + GetActorTasksFromList(actor_id, local_queues_.GetPlaceableTasks(), tasks_to_remove); + GetActorTasksFromList(actor_id, local_queues_.GetReadyTasks(), tasks_to_remove); + GetActorTasksFromList(actor_id, local_queues_.GetRunningTasks(), tasks_to_remove); + GetActorTasksFromList(actor_id, local_queues_.GetBlockedTasks(), tasks_to_remove); + + auto removed_tasks = local_queues_.RemoveTasks(tasks_to_remove); + for (auto const &task : removed_tasks) { + const TaskSpecification &spec = task.GetTaskSpecification(); + TreatTaskAsFailed(spec); + task_dependency_manager_.TaskCanceled(spec.TaskId()); + } +} + void NodeManager::ProcessNewClient(LocalClientConnection &client) { // The new client is a worker, so begin listening for messages. client.ProcessMessages(); @@ -437,7 +471,8 @@ void NodeManager::ProcessClientMessage( }); RAY_CHECK(running_tasks.size() != 0); RAY_CHECK(it != running_tasks.end()); - JobID job_id = it->GetTaskSpecification().DriverId(); + const TaskSpecification &spec = it->GetTaskSpecification(); + const JobID job_id = spec.DriverId(); // TODO(rkn): Define this constant somewhere else. std::string type = "worker_died"; std::ostringstream error_message; @@ -445,10 +480,30 @@ void NodeManager::ProcessClientMessage( << "."; RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( job_id, type, error_message.str(), current_time_ms())); + + // Handle the task failure in order to raise an exception in the + // application. + TreatTaskAsFailed(spec); + task_dependency_manager_.TaskCanceled(spec.TaskId()); + local_queues_.RemoveTask(spec.TaskId()); } worker_pool_.DisconnectWorker(worker); + // If the worker was an actor, add it to the list of dead actors. + const ActorID actor_id = worker->GetActorId(); + if (!actor_id.is_nil()) { + // TODO(rkn): Consider broadcasting a message to all of the other + // node managers so that they can mark the actor as dead. + RAY_LOG(DEBUG) << "The actor with ID " << actor_id << " died."; + auto actor_entry = actor_registry_.find(actor_id); + RAY_CHECK(actor_entry != actor_registry_.end()); + actor_entry->second.MarkDead(); + // For dead actors, if there are remaining tasks for this actor, we + // should handle them. + CleanUpTasksForDeadActor(actor_id); + } + const ClientID &client_id = gcs_client_->client_table().GetLocalClientId(); // Return the resources that were being used by this worker. @@ -652,15 +707,16 @@ void NodeManager::ScheduleTasks() { std::unordered_set local_task_ids; // Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node. for (const auto &task_schedule : policy_decision) { - TaskID task_id = task_schedule.first; - ClientID client_id = task_schedule.second; + const TaskID task_id = task_schedule.first; + const ClientID client_id = task_schedule.second; if (client_id == gcs_client_->client_table().GetLocalClientId()) { local_task_ids.insert(task_id); } else { // TODO(atumanov): need a better interface for task exit on forward. const auto task = local_queues_.RemoveTask(task_id); - // TODO(swang): Handle forward task failure. - RAY_CHECK_OK(ForwardTask(task, client_id)); + // Attempt to forward the task. If this fails to forward the task, + // the task will be resubmit locally. + ForwardTaskOrResubmit(task, client_id); } } @@ -673,6 +729,34 @@ void NodeManager::ScheduleTasks() { } } +void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) { + RAY_LOG(DEBUG) << "Treating task " << spec.TaskId() << " as failed."; + // Loop over the return IDs (except the dummy ID) and store a fake object in + // the object store. + int64_t num_returns = spec.NumReturns(); + if (spec.IsActorTask()) { + // TODO(rkn): We subtract 1 to avoid the dummy ID. However, this leaks + // information about the TaskSpecification implementation. + num_returns -= 1; + } + for (int64_t i = 0; i < num_returns; i++) { + const ObjectID object_id = spec.ReturnId(i); + + std::shared_ptr data; + // TODO(ekl): this writes an invalid arrow object, which is sufficient to + // signal that the worker failed, but it would be nice to return more + // detailed failure metadata in the future. + arrow::Status status = + store_client_.Create(object_id.to_plasma_id(), 1, NULL, 0, &data); + if (!status.IsPlasmaObjectExists()) { + // TODO(rkn): We probably don't want this checks. E.g., if the object + // store is full, we don't want to kill the raylet. + ARROW_CHECK_OK(status); + ARROW_CHECK_OK(store_client_.Seal(object_id.to_plasma_id())); + } + } +} + void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded) { // Add the task and its uncommitted lineage to the lineage cache. @@ -690,13 +774,26 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // We have a known location for the actor. auto node_manager_id = actor_entry->second.GetNodeManagerId(); if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) { - // The actor is local. Queue the task for local execution, bypassing placement. - EnqueuePlaceableTask(task); + // The actor is local. Check if the actor is still alive. + if (!actor_entry->second.IsAlive()) { + // Handle the fact that this actor is dead. + TreatTaskAsFailed(spec); + } else { + // Queue the task for local execution, bypassing placement. + EnqueuePlaceableTask(task); + } } else { // The actor is remote. Forward the task to the node manager that owns // the actor. - // TODO(swang): Handle forward task failure. - RAY_CHECK_OK(ForwardTask(task, node_manager_id)); + if (removed_clients_.find(node_manager_id) != removed_clients_.end()) { + // The remote node manager is dead, so handle the fact that this actor + // is also dead. + TreatTaskAsFailed(spec); + } else { + // Attempt to forward the task. If this fails to forward the task, + // the task will be resubmit locally. + ForwardTaskOrResubmit(task, node_manager_id); + } } } else { // We do not have a registered location for the object, so either the @@ -1033,6 +1130,35 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) { } } +void NodeManager::ForwardTaskOrResubmit(const Task &task, + const ClientID &node_manager_id) { + /// TODO(rkn): Should we check that the node manager is remote and not local? + /// TODO(rkn): Should we check if the remote node manager is known to be dead? + const TaskID task_id = task.GetTaskSpecification().TaskId(); + + // Attempt to forward the task. + if (!ForwardTask(task, node_manager_id).ok()) { + RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager " + << node_manager_id; + + // Create a timer to resubmit the task in a little bit. TODO(rkn): Really + // this should be a unique_ptr instead of a shared_ptr. However, it's a + // little harder to move unique_ptrs into lambdas. + auto retry_timer = std::make_shared(io_service_); + auto retry_duration = boost::posix_time::milliseconds( + RayConfig::instance().node_manager_forward_task_retry_timeout_milliseconds()); + retry_timer->expires_from_now(retry_duration); + retry_timer->async_wait( + [this, task, task_id, retry_timer](const boost::system::error_code &error) { + // Timer killing will receive the boost::asio::error::operation_aborted, + // we only handle the timeout event. + RAY_CHECK(!error); + RAY_LOG(INFO) << "In ForwardTask retry callback for task " << task_id; + EnqueuePlaceableTask(task); + }); + } +} + ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) { const auto &spec = task.GetTaskSpecification(); auto task_id = spec.TaskId(); @@ -1086,16 +1212,15 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id) ObjectID argument_id = spec.ArgId(i, j); // If the argument is local, then push it to the receiving node. if (task_dependency_manager_.CheckObjectLocal(argument_id)) { - RAY_CHECK_OK(object_manager_.Push(argument_id, node_id)); + object_manager_.Push(argument_id, node_id); } } } } } else { - // TODO(atumanov): caller must handle ForwardTask failure to ensure tasks are not - // lost. - RAY_LOG(FATAL) << "[NodeManager][ForwardTask] failed to forward task " << task_id - << " to node " << node_id; + // TODO(atumanov): caller must handle ForwardTask failure. + RAY_LOG(WARNING) << "[NodeManager][ForwardTask] failed to forward task " << task_id + << " to node " << node_id; } return status; } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index feb816d0d..d8c180ba3 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -26,6 +26,8 @@ struct NodeManagerConfig { std::vector worker_command; uint64_t heartbeat_period_ms; uint64_t max_lineage_size; + /// The store socket name. + std::string store_socket_name; }; class NodeManager { @@ -73,6 +75,14 @@ class NodeManager { /// Methods for task scheduling. /// Enqueue a placeable task to wait on object dependencies or be ready for dispatch. void EnqueuePlaceableTask(const Task &task); + /// This will treat the task as if it had been executed and failed. This is + /// done by looping over the task return IDs and for each ID storing an object + /// that represents a failure in the object store. When clients retrieve these + /// objects, they will raise application-level exceptions. + /// + /// \param spec The specification of the task. + /// \return Void. + void TreatTaskAsFailed(const TaskSpecification &spec); /// Handle specified task's submission to the local node manager. void SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded = false); @@ -84,6 +94,12 @@ class NodeManager { void ScheduleTasks(); /// Resubmit a task whose return value needs to be reconstructed. void ResubmitTask(const TaskID &task_id); + /// Attempt to forward a task to a remote different node manager. If this + /// fails, the task will be resubmit locally. + /// + /// \param task The task in question. + /// \param node_manager_id The ID of the remote node manager. + void ForwardTaskOrResubmit(const Task &task, const ClientID &node_manager_id); /// Forward a task to another node to execute. The task is assumed to not be /// queued in local_queues_. ray::Status ForwardTask(const Task &task, const ClientID &node_id); @@ -100,6 +116,27 @@ class NodeManager { void HandleActorCreation(const ActorID &actor_id, const std::vector &data); + /// TODO(rkn): This should probably be removed when we improve the + /// SchedulingQueue API. This is a helper function for + /// CleanUpTasksForDeadActor. + /// + /// This essentially loops over all of the tasks in the provided list and + /// finds The IDs of the tasks that belong to the given actor. + /// + /// \param actor_id The actor to get the tasks for. + /// \param tasks A list of tasks to extract from. + /// \param tasks_to_remove The task IDs of the extracted tasks are inserted in + /// this vector. + void GetActorTasksFromList(const ActorID &actor_id, const std::list &tasks, + std::unordered_set &tasks_to_remove); + + /// When an actor dies, loop over all of the queued tasks for that actor and + /// treat them as failed. + /// + /// \param actor_id The actor that died. + /// \return Void. + void CleanUpTasksForDeadActor(const ActorID &actor_id); + /// Methods for managing object dependencies. /// Handle a dependency required by a queued task that is missing locally. /// The dependency is (1) on a remote node, (2) pending creation on a remote @@ -117,6 +154,10 @@ class NodeManager { boost::asio::io_service &io_service_; ObjectManager &object_manager_; + /// A Plasma object store client. This is used exclusively for creating new + /// objects in the object store (e.g., for actor tasks that can't be run + /// because the actor died). + plasma::PlasmaClient store_client_; /// A client connection to the GCS. std::shared_ptr gcs_client_; boost::asio::deadline_timer heartbeat_timer_; @@ -143,6 +184,8 @@ class NodeManager { /// this could grow unbounded. std::unordered_set removed_clients_; std::unordered_map remote_server_connections_; + /// A mapping from actor ID to registration information about that actor + /// (including which node manager owns it). std::unordered_map actor_registry_; }; diff --git a/src/ray/raylet/object_manager_integration_test.cc b/src/ray/raylet/object_manager_integration_test.cc index 24333b4de..b9af292c1 100644 --- a/src/ray/raylet/object_manager_integration_test.cc +++ b/src/ray/raylet/object_manager_integration_test.cc @@ -174,7 +174,7 @@ class TestObjectManagerIntegration : public TestObjectManagerBase { num_expected_objects = (uint)1; ObjectID oid1 = WriteDataToClient(client1, data_size); - status = server1->object_manager_.Push(oid1, client_id_2); + server1->object_manager_.Push(oid1, client_id_2); } void TestPushComplete() { diff --git a/test/failure_test.py b/test/failure_test.py index bc74b78d0..ee1069401 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -300,9 +300,6 @@ class WorkerDeath(unittest.TestCase): self.assertIn("died or was killed while executing", ray.error_info()[0]["message"]) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorWorkerDying(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -321,9 +318,6 @@ class WorkerDeath(unittest.TestCase): self.assertRaises(Exception, lambda: ray.get(consume.remote(obj))) wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorWorkerDyingFutureTasks(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE) @@ -346,9 +340,6 @@ class WorkerDeath(unittest.TestCase): wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorWorkerDyingNothingInProgress(self): ray.init(num_workers=0, driver_mode=ray.SILENT_MODE)