From 58ac8639b99c66b49d15e5737141bf92cf667936 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 11 Dec 2019 14:47:24 -0800 Subject: [PATCH] Fix bad checks and race condition from actor_deaths and node_failures tests (#6411) --- src/ray/common/task/task.h | 2 +- src/ray/core_worker/core_worker.cc | 2 +- src/ray/core_worker/future_resolver.cc | 4 +-- .../test/direct_task_transport_test.cc | 3 +- .../transport/direct_task_transport.cc | 25 ++++++++++++---- src/ray/protobuf/common.proto | 2 ++ src/ray/protobuf/node_manager.proto | 4 ++- src/ray/raylet/node_manager.cc | 29 ++++++++++--------- src/ray/raylet/node_manager.h | 4 +-- src/ray/raylet/raylet_client.cc | 4 ++- src/ray/raylet/raylet_client.h | 7 +++-- src/ray/rpc/worker/core_worker_client.h | 25 ++++++++++++++-- 12 files changed, 79 insertions(+), 32 deletions(-) diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index 41f37655d..a606badec 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -10,7 +10,7 @@ namespace ray { typedef std::function, const std::string &, int, - const ResourceIdSet &)> + const WorkerID &, const ResourceIdSet &)> DispatchTaskCallback; /// Arguments are the raylet ID to spill back to, the raylet's /// address and the raylet's port. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 8ce11bd88..b19d04dfb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -198,7 +198,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, auto client_factory = [this](const rpc::WorkerAddress &addr) { return std::shared_ptr( - new rpc::CoreWorkerClient(addr.first, addr.second, *client_call_manager_)); + new rpc::CoreWorkerClient(addr.ip_address, addr.port, *client_call_manager_)); }; direct_actor_submitter_ = std::unique_ptr( new CoreWorkerDirectActorTaskSubmitter(client_factory, memory_store_, diff --git a/src/ray/core_worker/future_resolver.cc b/src/ray/core_worker/future_resolver.cc index ce9ee8574..84629c698 100644 --- a/src/ray/core_worker/future_resolver.cc +++ b/src/ray/core_worker/future_resolver.cc @@ -8,8 +8,8 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID absl::MutexLock lock(&mu_); auto it = owner_clients_.find(owner_id); if (it == owner_clients_.end()) { - auto client = std::shared_ptr( - client_factory_({owner_address.ip_address(), owner_address.port()})); + auto client = std::shared_ptr(client_factory_( + {owner_address.ip_address(), owner_address.port(), WorkerID::Nil()})); it = owner_clients_.emplace(owner_id, std::move(client)).first; } diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 78790cda5..442f3947c 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -54,7 +54,8 @@ class MockTaskFinisher : public TaskFinisherInterface { class MockRayletClient : public WorkerLeaseInterface { public: - ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override { + ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id, + bool disconnect_worker) override { if (disconnect_worker) { num_workers_disconnected++; } else { diff --git a/src/ray/core_worker/transport/direct_task_transport.cc b/src/ray/core_worker/transport/direct_task_transport.cc index 7250d9dd8..b7058250f 100644 --- a/src/ray/core_worker/transport/direct_task_transport.cc +++ b/src/ray/core_worker/transport/direct_task_transport.cc @@ -29,7 +29,7 @@ void CoreWorkerDirectTaskSubmitter::AddWorkerLeaseClient( if (it == client_cache_.end()) { client_cache_[addr] = std::shared_ptr(client_factory_(addr)); - RAY_LOG(INFO) << "Connected to " << addr.first << ":" << addr.second; + RAY_LOG(INFO) << "Connected to " << addr.ip_address << ":" << addr.port; } int64_t expiration = current_time_ms() + lease_timeout_ms_; worker_to_lease_client_.emplace(addr, @@ -45,7 +45,10 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle( // there are no more applicable queued tasks, or the lease is expired. if (was_error || queue_entry == task_queues_.end() || current_time_ms() > lease_entry.second) { - RAY_CHECK_OK(lease_entry.first->ReturnWorker(addr.second, was_error)); + auto status = lease_entry.first->ReturnWorker(addr.port, addr.worker_id, was_error); + if (!status.ok()) { + RAY_LOG(ERROR) << "Error returning worker to raylet: " << status.ToString(); + } worker_to_lease_client_.erase(addr); } else { auto &client = *client_cache_[addr]; @@ -110,8 +113,9 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded( // We got a lease for a worker. Add the lease client state and try to // assign work to the worker. RAY_LOG(DEBUG) << "Lease granted " << task_id; - rpc::WorkerAddress addr(reply.worker_address().ip_address(), - reply.worker_address().port()); + rpc::WorkerAddress addr = { + reply.worker_address().ip_address(), reply.worker_address().port(), + WorkerID::FromBinary(reply.worker_address().worker_id())}; AddWorkerLeaseClient(addr, std::move(lease_client)); auto resources_copy = reply.resource_mapping(); OnWorkerIdle(addr, scheduling_key, /*error=*/false, resources_copy); @@ -160,7 +164,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( // access the task. request->mutable_task_spec()->CopyFrom(task_spec.GetMessage()); request->mutable_resource_mapping()->CopyFrom(assigned_resources); - RAY_CHECK_OK(client.PushNormalTask( + auto status = client.PushNormalTask( std::move(request), [this, task_id, is_actor, scheduling_key, addr, assigned_resources]( Status status, const rpc::PushTaskReply &reply) { @@ -179,6 +183,15 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask( } else { task_finisher_->CompletePendingTask(task_id, reply); } - })); + }); + if (!status.ok()) { + RAY_LOG(ERROR) << "Error pushing task to worker: " << status.ToString(); + { + absl::MutexLock lock(&mu_); + OnWorkerIdle(addr, scheduling_key, /*error=*/true, assigned_resources); + } + task_finisher_->PendingTaskFailed( + task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED); + } } }; // namespace ray diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 9bfd72463..e62cfd025 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -32,6 +32,8 @@ message Address { bytes raylet_id = 1; string ip_address = 2; int32 port = 3; + // Optional unique id for the worker. + bytes worker_id = 4; } /// The task specification encapsulates all immutable information about the diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 56bc99814..bea723e12 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -24,9 +24,11 @@ message WorkerLeaseReply { message ReturnWorkerRequest { // Port of the leased worker that we are now returning. int32 worker_port = 1; + // Unique id of the leased worker we are now returning. + bytes worker_id = 2; // If true, there was some unrecoverable error and the raylet should // disconnect the worker. - bool disconnect_worker = 2; + bool disconnect_worker = 3; } message ReturnWorkerReply { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4dddd876d..12b674a29 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1177,7 +1177,7 @@ void NodeManager::ProcessDisconnectClientMessage( task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId()); } // Erase any lease metadata. - leased_workers_.erase(worker->Port()); + leased_workers_.erase(worker->WorkerId()); } if (is_worker) { @@ -1562,11 +1562,12 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques reply->mutable_worker_address()->set_ip_address( initial_config_.node_manager_address); reply->mutable_worker_address()->set_port(worker->Port()); + reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary()); reply->mutable_worker_address()->set_raylet_id( gcs_client_->client_table().GetLocalClientId().Binary()); - RAY_CHECK(leased_workers_.find(worker->Port()) == leased_workers_.end()); - leased_workers_[worker->Port()] = worker; - leased_worker_resources_[worker->Port()] = request_resources; + RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end()); + leased_workers_[worker->WorkerId()] = worker; + leased_worker_resources_[worker->WorkerId()] = request_resources; } else { reply->mutable_retry_at_raylet_address()->set_ip_address(address); reply->mutable_retry_at_raylet_address()->set_port(port); @@ -1586,12 +1587,13 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId(); TaskID task_id = task.GetTaskSpecification().TaskId(); task.OnDispatchInstead( - [this, task_id, reply, send_reply_callback](const std::shared_ptr granted, - const std::string &address, int port, - const ResourceIdSet &resource_ids) { + [this, task_id, reply, send_reply_callback]( + const std::shared_ptr granted, const std::string &address, int port, + const WorkerID &worker_id, const ResourceIdSet &resource_ids) { RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id; reply->mutable_worker_address()->set_ip_address(address); reply->mutable_worker_address()->set_port(port); + reply->mutable_worker_address()->set_worker_id(worker_id.Binary()); reply->mutable_worker_address()->set_raylet_id( gcs_client_->client_table().GetLocalClientId().Binary()); for (const auto &mapping : resource_ids.AvailableResources()) { @@ -1613,8 +1615,8 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques // TODO(swang): Kill worker if other end hangs up. // TODO(swang): Implement a lease term by which the owner needs to return the // worker. - RAY_CHECK(leased_workers_.find(port) == leased_workers_.end()); - leased_workers_[port] = std::static_pointer_cast(granted); + RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()); + leased_workers_[worker_id] = std::static_pointer_cast(granted); }); task.OnSpillbackInstead( [reply, task_id, send_reply_callback](const ClientID &spillback_to, @@ -1632,18 +1634,18 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, rpc::ReturnWorkerReply *reply, rpc::SendReplyCallback send_reply_callback) { // Read the resource spec submitted by the client. - auto worker_port = request.worker_port(); - std::shared_ptr worker = std::move(leased_workers_[worker_port]); + auto worker_id = WorkerID::FromBinary(request.worker_id()); + std::shared_ptr worker = std::move(leased_workers_[worker_id]); if (new_scheduler_enabled_) { - auto it = leased_worker_resources_.find(worker_port); + auto it = leased_worker_resources_.find(worker_id); RAY_CHECK(it != leased_worker_resources_.end()); new_resource_scheduler_->AddNodeAvailableResources(client_id_.Binary(), it->second); leased_worker_resources_.erase(it); NewSchedulerSchedulePendingTasks(); } - leased_workers_.erase(worker_port); + leased_workers_.erase(worker_id); Status status; if (worker) { if (request.disconnect_worker()) { @@ -2280,6 +2282,7 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & worker->MarkDetachedActor(); } task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(), + worker->WorkerId(), spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds() : worker->GetTaskResourceIds()); post_assign_callbacks->push_back([this, worker, task_id]() { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index ab8e8ebe1..f2e41358e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -633,7 +633,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { remote_node_manager_clients_; /// Map of workers leased out to direct call clients. - std::unordered_map> leased_workers_; + std::unordered_map> leased_workers_; /// Whether new schedule is enabled. const bool new_scheduler_enabled_; @@ -641,7 +641,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// The new resource scheduler for direct task calls. std::shared_ptr new_resource_scheduler_; /// Map of leased workers to their current resource usage. - std::unordered_map> + std::unordered_map> leased_worker_resources_; typedef std::function, ClientID spillback_to, diff --git a/src/ray/raylet/raylet_client.cc b/src/ray/raylet/raylet_client.cc index dd145de5c..870a11f9a 100644 --- a/src/ray/raylet/raylet_client.cc +++ b/src/ray/raylet/raylet_client.cc @@ -414,9 +414,11 @@ Status raylet::RayletClient::RequestWorkerLease( return grpc_client_->RequestWorkerLease(request, callback); } -Status raylet::RayletClient::ReturnWorker(int worker_port, bool disconnect_worker) { +Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id, + bool disconnect_worker) { rpc::ReturnWorkerRequest request; request.set_worker_port(worker_port); + request.set_worker_id(worker_id.Binary()); request.set_disconnect_worker(disconnect_worker); return grpc_client_->ReturnWorker( request, [](const Status &status, const rpc::ReturnWorkerReply &reply) { diff --git a/src/ray/raylet/raylet_client.h b/src/ray/raylet/raylet_client.h index e45e58abd..06f77dd78 100644 --- a/src/ray/raylet/raylet_client.h +++ b/src/ray/raylet/raylet_client.h @@ -44,9 +44,11 @@ class WorkerLeaseInterface { /// Returns a worker to the raylet. /// \param worker_port The local port of the worker on the raylet node. + /// \param worker_id The unique worker id of the worker on the raylet node. /// \param disconnect_worker Whether the raylet should disconnect the worker. /// \return ray::Status - virtual ray::Status ReturnWorker(int worker_port, bool disconnect_worker) = 0; + virtual ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id, + bool disconnect_worker) = 0; virtual ~WorkerLeaseInterface(){}; }; @@ -243,7 +245,8 @@ class RayletClient : public WorkerLeaseInterface { const ray::rpc::ClientCallback &callback) override; /// Implements WorkerLeaseInterface. - ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override; + ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id, + bool disconnect_worker) override; WorkerID GetWorkerID() const { return worker_id_; } diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index ab70a6998..1e84d1ead 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -8,6 +8,7 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/hash/hash.h" #include "ray/common/status.h" #include "ray/rpc/client_call.h" @@ -34,9 +35,29 @@ const static int64_t RequestSizeInBytes(const PushTaskRequest &request) { } // Shared between direct actor and task submitters. -// TODO(swang): Remove and replace with rpc::Address. class CoreWorkerClientInterface; -typedef std::pair WorkerAddress; + +// TODO(swang): Remove and replace with rpc::Address. +class WorkerAddress { + public: + template + friend H AbslHashValue(H h, const WorkerAddress &w) { + return H::combine(std::move(h), w.ip_address, w.port, w.worker_id); + } + + bool operator==(const WorkerAddress &other) const { + return other.ip_address == ip_address && other.port == port && + other.worker_id == worker_id; + } + + /// The ip address of the worker. + const std::string ip_address; + /// The local port of the worker. + const int port; + /// The unique id of the worker. + const WorkerID worker_id; +}; + typedef std::function(const WorkerAddress &)> ClientFactoryFn;