diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index cdd0ff6ba..ca94344a0 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -149,10 +149,11 @@ def wait_for_condition(condition_predictor, Whether the condition is met within the timeout. """ time_elapsed = 0 + start = time.time() while time_elapsed <= timeout_ms: if condition_predictor(): return True - time_elapsed += retry_interval_ms + time_elapsed = (time.time() - start) * 1000 time.sleep(retry_interval_ms / 1000.0) return False diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 2a485207e..ced05d1be 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -15,6 +15,7 @@ import ray.ray_constants as ray_constants from ray.cluster_utils import Cluster from ray.test_utils import ( relevant_errors, + wait_for_condition, wait_for_errors, RayTestTimeoutException, ) @@ -991,6 +992,84 @@ def test_serialized_id(ray_start_cluster): ray.get(get.remote([obj], True)) +def test_fate_sharing(ray_start_cluster): + config = json.dumps({ + "num_heartbeats_timeout": 10, + "raylet_heartbeat_timeout_milliseconds": 100, + }) + cluster = Cluster() + # Head node with no resources. + cluster.add_node(num_cpus=0, _internal_config=config) + # Node to place the parent actor. + node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1}) + # Node to place the child actor. + cluster.add_node(num_cpus=1, resources={"child": 1}) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + @ray.remote + def sleep(): + time.sleep(1000) + + @ray.remote(resources={"child": 1}) + def probe(): + return + + @ray.remote + class Actor(object): + def __init__(self): + return + + def start_child(self, use_actors): + if use_actors: + child = Actor.options(resources={"child": 1}).remote() + ray.get(child.sleep.remote()) + else: + ray.get(sleep.options(resources={"child": 1}).remote()) + + def sleep(self): + time.sleep(1000) + + def get_pid(self): + return os.getpid() + + # Returns whether the "child" resource is available. + def child_resource_available(): + p = probe.remote() + ready, _ = ray.wait([p], timeout=1) + return len(ready) > 0 + + # Test fate sharing if the parent process dies. + def test_process_failure(use_actors): + a = Actor.options(resources={"parent": 1}).remote() + pid = ray.get(a.get_pid.remote()) + a.start_child.remote(use_actors=use_actors) + # Wait for the child to be scheduled. + assert wait_for_condition( + lambda: not child_resource_available(), timeout_ms=10000) + # Kill the parent process. + os.kill(pid, 9) + assert wait_for_condition(child_resource_available, timeout_ms=10000) + + # Test fate sharing if the parent node dies. + def test_node_failure(node_to_kill, use_actors): + a = Actor.options(resources={"parent": 1}).remote() + a.start_child.remote(use_actors=use_actors) + # Wait for the child to be scheduled. + assert wait_for_condition( + lambda: not child_resource_available(), timeout_ms=10000) + # Kill the parent process. + cluster.remove_node(node_to_kill, allow_graceful=False) + node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1}) + assert wait_for_condition(child_resource_available, timeout_ms=10000) + return node_to_kill + + test_process_failure(use_actors=True) + test_process_failure(use_actors=False) + node_to_kill = test_node_failure(node_to_kill, use_actors=True) + node_to_kill = test_node_failure(node_to_kill, use_actors=False) + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 8005e2971..ddfed6682 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -74,7 +74,9 @@ class BaseID { protected: BaseID(const std::string &binary) { - std::memcpy(const_cast(this->Data()), binary.data(), T::Size()); + RAY_CHECK(binary.size() == Size() || binary.size() == 0) + << "expected size is " << Size() << ", but got " << binary.size(); + std::memcpy(const_cast(this->Data()), binary.data(), binary.size()); } // All IDs are immutable for hash evaluations. MutableData is only allow to use // in construction time, so this function is protected. @@ -389,7 +391,9 @@ std::ostream &operator<<(std::ostream &os, const ObjectID &id); \ private: \ explicit type(const std::string &binary) { \ - std::memcpy(&id_, binary.data(), kUniqueIDSize); \ + RAY_CHECK(binary.size() == Size() || binary.size() == 0) \ + << "expected size is " << Size() << ", but got " << binary.size(); \ + std::memcpy(&id_, binary.data(), binary.size()); \ } \ }; @@ -416,10 +420,10 @@ T BaseID::FromRandom() { template T BaseID::FromBinary(const std::string &binary) { - RAY_CHECK(binary.size() == T::Size()) + RAY_CHECK(binary.size() == T::Size() || binary.size() == 0) << "expected size is " << T::Size() << ", but got " << binary.size(); T t = T::Nil(); - std::memcpy(t.MutableData(), binary.data(), T::Size()); + std::memcpy(t.MutableData(), binary.data(), binary.size()); return t; } diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index ca31f37f8..4aea8b4bf 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -185,6 +185,10 @@ TaskID TaskSpecification::CallerId() const { return TaskID::FromBinary(message_->caller_id()); } +const rpc::Address &TaskSpecification::CallerAddress() const { + return message_->caller_address(); +} + // === Below are getter methods specific to actor tasks. ActorID TaskSpecification::ActorId() const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 2c41d6fee..9f4f7d1ee 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -147,6 +147,8 @@ class TaskSpecification : public MessageWrapper { TaskID CallerId() const; + const rpc::Address &CallerAddress() const; + uint64_t ActorCounter() const; ObjectID ActorCreationDummyObjectId() const; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fbc06a238..5d5c47610 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -198,7 +198,7 @@ ray::Status NodeManager::RegisterGcs() { // in their rpc::Address to the ID of a failed raylet. const auto &worker_failure_handler = [this](const WorkerID &id, const gcs::WorkerFailureData &worker_failure_data) { - HandleUnexpectedWorkerFailure(id, worker_failure_data); + HandleUnexpectedWorkerFailure(worker_failure_data.worker_address()); }; RAY_CHECK_OK(gcs_client_->Workers().AsyncSubscribeToWorkerFailures( worker_failure_handler, /*done_callback=*/nullptr)); @@ -222,12 +222,6 @@ ray::Status NodeManager::RegisterGcs() { return ray::Status::OK(); } -void NodeManager::HandleUnexpectedWorkerFailure( - const WorkerID &worker_id, const gcs::WorkerFailureData &worker_failed_data) { - RAY_LOG(DEBUG) << "Worker " << worker_id << " failed"; - // TODO: Clean up after the failure: If the failed worker is our owner, then exit. -} - void NodeManager::KillWorker(std::shared_ptr worker) { #ifdef _WIN32 // TODO(mehrdadn): Implement implement graceful process termination mechanism @@ -527,6 +521,54 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { // guarantee that all tasks get flushed eventually, in case one of the tasks // in our local cache was supposed to be flushed by the node that died. lineage_cache_.FlushAllUncommittedTasks(); + + // Clean up workers that were owned by processes that were on the failed + // node. + rpc::Address address; + address.set_raylet_id(node_info.node_id()); + HandleUnexpectedWorkerFailure(address); +} + +void NodeManager::HandleUnexpectedWorkerFailure(const rpc::Address &address) { + const WorkerID worker_id = WorkerID::FromBinary(address.worker_id()); + const ClientID node_id = ClientID::FromBinary(address.raylet_id()); + if (!worker_id.IsNil()) { + RAY_LOG(DEBUG) << "Worker " << worker_id << " failed"; + failed_workers_cache_.insert(worker_id); + } else { + RAY_CHECK(!node_id.IsNil()); + failed_nodes_cache_.insert(node_id); + } + + // TODO(swang): Also clean up any lease requests owned by the failed worker + // from the task queues. This is only necessary for lease requests that are + // infeasible, since requests that are fulfilled will get canceled during + // dispatch. + for (const auto &pair : leased_workers_) { + auto &worker = pair.second; + const auto owner_worker_id = + WorkerID::FromBinary(worker->GetOwnerAddress().worker_id()); + const auto owner_node_id = + WorkerID::FromBinary(worker->GetOwnerAddress().raylet_id()); + RAY_LOG(DEBUG) << "Lease " << worker->WorkerId() << " owned by " << owner_worker_id; + RAY_CHECK(!owner_worker_id.IsNil() && !owner_node_id.IsNil()); + if (!worker->IsDetachedActor()) { + if (!worker_id.IsNil()) { + // If the failed worker was a leased worker's owner, then kill the leased worker. + if (owner_worker_id == worker_id) { + RAY_LOG(INFO) << "Owner process " << owner_worker_id + << " died, killing leased worker " << worker->WorkerId(); + KillWorker(worker); + } + } else if (owner_node_id == node_id) { + // If the leased worker's owner was on the failed node, then kill the leased + // worker. + RAY_LOG(INFO) << "Owner node " << owner_node_id << " died, killing leased worker " + << worker->WorkerId(); + KillWorker(worker); + } + } + } } void NodeManager::ResourceCreateUpdated(const ClientID &client_id, @@ -1578,11 +1620,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest // task directly on the worker. RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId(); TaskID task_id = task.GetTaskSpecification().TaskId(); + rpc::Address owner_address = task.GetTaskSpecification().CallerAddress(); task.OnDispatchInstead( - [this, task_id, reply, send_reply_callback]( + [this, owner_address, reply, send_reply_callback]( const std::shared_ptr granted, const std::string &address, int port, const WorkerID &worker_id, const ResourceIdSet &resource_ids) { - RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id; reply->mutable_worker_address()->set_ip_address(address); reply->mutable_worker_address()->set_port(port); reply->mutable_worker_address()->set_worker_id(worker_id.Binary()); @@ -1603,12 +1645,11 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest } send_reply_callback(Status::OK(), nullptr, nullptr); - // TODO(swang): Kill worker if other end hangs up. - // TODO(swang): Implement a lease term by which the owner needs to return the - // worker. RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()) << "Worker is already leased out " << worker_id; - leased_workers_[worker_id] = std::static_pointer_cast(granted); + + auto worker = std::static_pointer_cast(granted); + leased_workers_[worker_id] = worker; }); task.OnSpillbackInstead( [reply, task_id, send_reply_callback](const ClientID &spillback_to, @@ -1627,6 +1668,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, rpc::SendReplyCallback send_reply_callback) { // Read the resource spec submitted by the client. auto worker_id = WorkerID::FromBinary(request.worker_id()); + RAY_LOG(DEBUG) << "Return worker " << worker_id; std::shared_ptr worker = leased_workers_[worker_id]; if (new_scheduler_enabled_) { @@ -2319,10 +2361,27 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & if (task.GetTaskSpecification().IsDetachedActor()) { worker->MarkDetachedActor(); } + + const auto owner_worker_id = WorkerID::FromBinary(spec.CallerAddress().worker_id()); + const auto owner_node_id = ClientID::FromBinary(spec.CallerAddress().raylet_id()); + RAY_CHECK(!owner_worker_id.IsNil()); + RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id << " to worker " + << worker->WorkerId() << ", owner ID " << owner_worker_id; + task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(), worker->WorkerId(), spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds() : worker->GetTaskResourceIds()); + + // If the owner has died since this task was queued, cancel the task by + // killing the worker. + if (failed_workers_cache_.count(owner_worker_id) > 0 || + failed_nodes_cache_.count(owner_node_id) > 0) { + // TODO(swang): Skip assigning this task to this worker instead of + // killing the worker? + KillWorker(worker); + } + post_assign_callbacks->push_back([this, worker, task_id]() { RAY_LOG(DEBUG) << "Finished assigning task " << task_id << " to worker " << worker->WorkerId(); @@ -2386,6 +2445,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { // direct actor creation calls because this ID is used later if the actor // requires objects from plasma. worker.AssignTaskId(TaskID::Nil()); + worker.SetOwnerAddress(rpc::Address()); } // Direct actors will be assigned tasks via the core worker and therefore are // not idle. @@ -2965,6 +3025,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr &worker, auto spec = assigned_task.GetTaskSpecification(); // We successfully assigned the task to the worker. worker->AssignTaskId(spec.TaskId()); + worker->SetOwnerAddress(spec.CallerAddress()); worker->AssignJobId(spec.JobId()); // TODO(swang): For actors with multiple actor handles, to // guarantee that tasks are replayed in the same order after a diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d6bf6a492..fa863fb7a 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -121,10 +121,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Handle an unexpected failure notification from GCS pubsub. /// - /// \param worker_id The ID of the failed worker. - /// \param worker_data Data associated with the worker failure. - void HandleUnexpectedWorkerFailure(const WorkerID &worker_id, - const gcs::WorkerFailureData &worker_failed_data); + /// \param worker_address The address of the worker that died. + void HandleUnexpectedWorkerFailure(const rpc::Address &worker_address); /// Handler for the addition of a new node. /// @@ -659,6 +657,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Map of workers leased out to direct call clients. std::unordered_map> leased_workers_; + /// Map from owner worker ID to a list of worker IDs that the owner has a + /// lease on. + absl::flat_hash_map> leased_workers_by_owner_; + /// Whether new schedule is enabled. const bool new_scheduler_enabled_; @@ -687,8 +689,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler { absl::flat_hash_map> pinned_objects_; - /// XXX + /// Wait for a task's arguments to become ready. void WaitForTaskArgsRequests(std::pair &work); + + // TODO(swang): Evict entries from these caches. + /// Cache for the WorkerFailureTable in the GCS. + absl::flat_hash_set failed_workers_cache_; + /// Cache for the ClientTable in the GCS. + absl::flat_hash_set failed_nodes_cache_; }; } // namespace raylet diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index fe074b542..2f00d7723 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -383,7 +383,7 @@ void TaskDependencyManager::AcquireTaskLease(const TaskID &task_id) { auto task_lease_data = std::make_shared(); task_lease_data->set_task_id(task_id.Binary()); - task_lease_data->set_node_manager_id(client_id_.Hex()); + task_lease_data->set_node_manager_id(client_id_.Binary()); task_lease_data->set_acquired_at(absl::GetCurrentTimeNanos() / 1000000); task_lease_data->set_timeout(it->second.lease_period); RAY_CHECK_OK(gcs_client_->Tasks().AsyncAddTaskLease(task_lease_data, nullptr)); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index ec850ee66..b95c84125 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -91,6 +91,9 @@ const std::shared_ptr Worker::Connection() const { return connection_; } +void Worker::SetOwnerAddress(const rpc::Address &address) { owner_address_ = address; } +const rpc::Address &Worker::GetOwnerAddress() const { return owner_address_; } + const ResourceIdSet &Worker::GetLifetimeResourceIds() const { return lifetime_resource_ids_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 91721d0a3..79830dace 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -51,6 +51,8 @@ class Worker { void MarkDetachedActor(); bool IsDetachedActor() const; const std::shared_ptr Connection() const; + void SetOwnerAddress(const rpc::Address &address); + const rpc::Address &GetOwnerAddress() const; const ResourceIdSet &GetLifetimeResourceIds() const; void SetLifetimeResourceIds(ResourceIdSet &resource_ids); @@ -117,6 +119,9 @@ class Worker { /// Whether the worker is detached. This is applies when the worker is actor. /// Detached actor means the actor's creator can exit without killing this actor. bool is_detached_actor_; + /// The address of this worker's owner. The owner is the worker that + /// currently holds the lease on this worker, if any. + rpc::Address owner_address_; }; } // namespace raylet