Fix bad checks and race condition from actor_deaths and node_failures tests (#6411)

This commit is contained in:
Eric Liang
2019-12-11 14:47:24 -08:00
committed by GitHub
parent b3eb374817
commit 58ac8639b9
12 changed files with 79 additions and 32 deletions
+1 -1
View File
@@ -10,7 +10,7 @@
namespace ray {
typedef std::function<void(const std::shared_ptr<void>, const std::string &, int,
const ResourceIdSet &)>
const WorkerID &, const ResourceIdSet &)>
DispatchTaskCallback;
/// Arguments are the raylet ID to spill back to, the raylet's
/// address and the raylet's port.
+1 -1
View File
@@ -198,7 +198,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
auto client_factory = [this](const rpc::WorkerAddress &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr.first, addr.second, *client_call_manager_));
new rpc::CoreWorkerClient(addr.ip_address, addr.port, *client_call_manager_));
};
direct_actor_submitter_ = std::unique_ptr<CoreWorkerDirectActorTaskSubmitter>(
new CoreWorkerDirectActorTaskSubmitter(client_factory, memory_store_,
+2 -2
View File
@@ -8,8 +8,8 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID
absl::MutexLock lock(&mu_);
auto it = owner_clients_.find(owner_id);
if (it == owner_clients_.end()) {
auto client = std::shared_ptr<rpc::CoreWorkerClientInterface>(
client_factory_({owner_address.ip_address(), owner_address.port()}));
auto client = std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(
{owner_address.ip_address(), owner_address.port(), WorkerID::Nil()}));
it = owner_clients_.emplace(owner_id, std::move(client)).first;
}
@@ -54,7 +54,8 @@ class MockTaskFinisher : public TaskFinisherInterface {
class MockRayletClient : public WorkerLeaseInterface {
public:
ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override {
ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) override {
if (disconnect_worker) {
num_workers_disconnected++;
} else {
@@ -29,7 +29,7 @@ void CoreWorkerDirectTaskSubmitter::AddWorkerLeaseClient(
if (it == client_cache_.end()) {
client_cache_[addr] =
std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(addr));
RAY_LOG(INFO) << "Connected to " << addr.first << ":" << addr.second;
RAY_LOG(INFO) << "Connected to " << addr.ip_address << ":" << addr.port;
}
int64_t expiration = current_time_ms() + lease_timeout_ms_;
worker_to_lease_client_.emplace(addr,
@@ -45,7 +45,10 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(
// there are no more applicable queued tasks, or the lease is expired.
if (was_error || queue_entry == task_queues_.end() ||
current_time_ms() > lease_entry.second) {
RAY_CHECK_OK(lease_entry.first->ReturnWorker(addr.second, was_error));
auto status = lease_entry.first->ReturnWorker(addr.port, addr.worker_id, was_error);
if (!status.ok()) {
RAY_LOG(ERROR) << "Error returning worker to raylet: " << status.ToString();
}
worker_to_lease_client_.erase(addr);
} else {
auto &client = *client_cache_[addr];
@@ -110,8 +113,9 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
// We got a lease for a worker. Add the lease client state and try to
// assign work to the worker.
RAY_LOG(DEBUG) << "Lease granted " << task_id;
rpc::WorkerAddress addr(reply.worker_address().ip_address(),
reply.worker_address().port());
rpc::WorkerAddress addr = {
reply.worker_address().ip_address(), reply.worker_address().port(),
WorkerID::FromBinary(reply.worker_address().worker_id())};
AddWorkerLeaseClient(addr, std::move(lease_client));
auto resources_copy = reply.resource_mapping();
OnWorkerIdle(addr, scheduling_key, /*error=*/false, resources_copy);
@@ -160,7 +164,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->mutable_resource_mapping()->CopyFrom(assigned_resources);
RAY_CHECK_OK(client.PushNormalTask(
auto status = client.PushNormalTask(
std::move(request),
[this, task_id, is_actor, scheduling_key, addr, assigned_resources](
Status status, const rpc::PushTaskReply &reply) {
@@ -179,6 +183,15 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
} else {
task_finisher_->CompletePendingTask(task_id, reply);
}
}));
});
if (!status.ok()) {
RAY_LOG(ERROR) << "Error pushing task to worker: " << status.ToString();
{
absl::MutexLock lock(&mu_);
OnWorkerIdle(addr, scheduling_key, /*error=*/true, assigned_resources);
}
task_finisher_->PendingTaskFailed(
task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED);
}
}
}; // namespace ray
+2
View File
@@ -32,6 +32,8 @@ message Address {
bytes raylet_id = 1;
string ip_address = 2;
int32 port = 3;
// Optional unique id for the worker.
bytes worker_id = 4;
}
/// The task specification encapsulates all immutable information about the
+3 -1
View File
@@ -24,9 +24,11 @@ message WorkerLeaseReply {
message ReturnWorkerRequest {
// Port of the leased worker that we are now returning.
int32 worker_port = 1;
// Unique id of the leased worker we are now returning.
bytes worker_id = 2;
// If true, there was some unrecoverable error and the raylet should
// disconnect the worker.
bool disconnect_worker = 2;
bool disconnect_worker = 3;
}
message ReturnWorkerReply {
+16 -13
View File
@@ -1177,7 +1177,7 @@ void NodeManager::ProcessDisconnectClientMessage(
task_dependency_manager_.UnsubscribeWaitDependencies(worker->WorkerId());
}
// Erase any lease metadata.
leased_workers_.erase(worker->Port());
leased_workers_.erase(worker->WorkerId());
}
if (is_worker) {
@@ -1562,11 +1562,12 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques
reply->mutable_worker_address()->set_ip_address(
initial_config_.node_manager_address);
reply->mutable_worker_address()->set_port(worker->Port());
reply->mutable_worker_address()->set_worker_id(worker->WorkerId().Binary());
reply->mutable_worker_address()->set_raylet_id(
gcs_client_->client_table().GetLocalClientId().Binary());
RAY_CHECK(leased_workers_.find(worker->Port()) == leased_workers_.end());
leased_workers_[worker->Port()] = worker;
leased_worker_resources_[worker->Port()] = request_resources;
RAY_CHECK(leased_workers_.find(worker->WorkerId()) == leased_workers_.end());
leased_workers_[worker->WorkerId()] = worker;
leased_worker_resources_[worker->WorkerId()] = request_resources;
} else {
reply->mutable_retry_at_raylet_address()->set_ip_address(address);
reply->mutable_retry_at_raylet_address()->set_port(port);
@@ -1586,12 +1587,13 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques
RAY_LOG(DEBUG) << "Worker lease request " << task.GetTaskSpecification().TaskId();
TaskID task_id = task.GetTaskSpecification().TaskId();
task.OnDispatchInstead(
[this, task_id, reply, send_reply_callback](const std::shared_ptr<void> granted,
const std::string &address, int port,
const ResourceIdSet &resource_ids) {
[this, task_id, reply, send_reply_callback](
const std::shared_ptr<void> granted, const std::string &address, int port,
const WorkerID &worker_id, const ResourceIdSet &resource_ids) {
RAY_LOG(DEBUG) << "Worker lease request DISPATCH " << task_id;
reply->mutable_worker_address()->set_ip_address(address);
reply->mutable_worker_address()->set_port(port);
reply->mutable_worker_address()->set_worker_id(worker_id.Binary());
reply->mutable_worker_address()->set_raylet_id(
gcs_client_->client_table().GetLocalClientId().Binary());
for (const auto &mapping : resource_ids.AvailableResources()) {
@@ -1613,8 +1615,8 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques
// TODO(swang): Kill worker if other end hangs up.
// TODO(swang): Implement a lease term by which the owner needs to return the
// worker.
RAY_CHECK(leased_workers_.find(port) == leased_workers_.end());
leased_workers_[port] = std::static_pointer_cast<Worker>(granted);
RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end());
leased_workers_[worker_id] = std::static_pointer_cast<Worker>(granted);
});
task.OnSpillbackInstead(
[reply, task_id, send_reply_callback](const ClientID &spillback_to,
@@ -1632,18 +1634,18 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,
rpc::ReturnWorkerReply *reply,
rpc::SendReplyCallback send_reply_callback) {
// Read the resource spec submitted by the client.
auto worker_port = request.worker_port();
std::shared_ptr<Worker> worker = std::move(leased_workers_[worker_port]);
auto worker_id = WorkerID::FromBinary(request.worker_id());
std::shared_ptr<Worker> worker = std::move(leased_workers_[worker_id]);
if (new_scheduler_enabled_) {
auto it = leased_worker_resources_.find(worker_port);
auto it = leased_worker_resources_.find(worker_id);
RAY_CHECK(it != leased_worker_resources_.end());
new_resource_scheduler_->AddNodeAvailableResources(client_id_.Binary(), it->second);
leased_worker_resources_.erase(it);
NewSchedulerSchedulePendingTasks();
}
leased_workers_.erase(worker_port);
leased_workers_.erase(worker_id);
Status status;
if (worker) {
if (request.disconnect_worker()) {
@@ -2280,6 +2282,7 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
worker->MarkDetachedActor();
}
task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(),
worker->WorkerId(),
spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds()
: worker->GetTaskResourceIds());
post_assign_callbacks->push_back([this, worker, task_id]() {
+2 -2
View File
@@ -633,7 +633,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
remote_node_manager_clients_;
/// Map of workers leased out to direct call clients.
std::unordered_map<int, std::shared_ptr<Worker>> leased_workers_;
std::unordered_map<WorkerID, std::shared_ptr<Worker>> leased_workers_;
/// Whether new schedule is enabled.
const bool new_scheduler_enabled_;
@@ -641,7 +641,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// The new resource scheduler for direct task calls.
std::shared_ptr<ClusterResourceScheduler> new_resource_scheduler_;
/// Map of leased workers to their current resource usage.
std::unordered_map<int, std::unordered_map<std::string, double>>
std::unordered_map<WorkerID, std::unordered_map<std::string, double>>
leased_worker_resources_;
typedef std::function<void(std::shared_ptr<Worker>, ClientID spillback_to,
+3 -1
View File
@@ -414,9 +414,11 @@ Status raylet::RayletClient::RequestWorkerLease(
return grpc_client_->RequestWorkerLease(request, callback);
}
Status raylet::RayletClient::ReturnWorker(int worker_port, bool disconnect_worker) {
Status raylet::RayletClient::ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) {
rpc::ReturnWorkerRequest request;
request.set_worker_port(worker_port);
request.set_worker_id(worker_id.Binary());
request.set_disconnect_worker(disconnect_worker);
return grpc_client_->ReturnWorker(
request, [](const Status &status, const rpc::ReturnWorkerReply &reply) {
+5 -2
View File
@@ -44,9 +44,11 @@ class WorkerLeaseInterface {
/// Returns a worker to the raylet.
/// \param worker_port The local port of the worker on the raylet node.
/// \param worker_id The unique worker id of the worker on the raylet node.
/// \param disconnect_worker Whether the raylet should disconnect the worker.
/// \return ray::Status
virtual ray::Status ReturnWorker(int worker_port, bool disconnect_worker) = 0;
virtual ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) = 0;
virtual ~WorkerLeaseInterface(){};
};
@@ -243,7 +245,8 @@ class RayletClient : public WorkerLeaseInterface {
const ray::rpc::ClientCallback<ray::rpc::WorkerLeaseReply> &callback) override;
/// Implements WorkerLeaseInterface.
ray::Status ReturnWorker(int worker_port, bool disconnect_worker) override;
ray::Status ReturnWorker(int worker_port, const WorkerID &worker_id,
bool disconnect_worker) override;
WorkerID GetWorkerID() const { return worker_id_; }
+23 -2
View File
@@ -8,6 +8,7 @@
#include <grpcpp/grpcpp.h>
#include "absl/base/thread_annotations.h"
#include "absl/hash/hash.h"
#include "ray/common/status.h"
#include "ray/rpc/client_call.h"
@@ -34,9 +35,29 @@ const static int64_t RequestSizeInBytes(const PushTaskRequest &request) {
}
// Shared between direct actor and task submitters.
// TODO(swang): Remove and replace with rpc::Address.
class CoreWorkerClientInterface;
typedef std::pair<std::string, int> WorkerAddress;
// TODO(swang): Remove and replace with rpc::Address.
class WorkerAddress {
public:
template <typename H>
friend H AbslHashValue(H h, const WorkerAddress &w) {
return H::combine(std::move(h), w.ip_address, w.port, w.worker_id);
}
bool operator==(const WorkerAddress &other) const {
return other.ip_address == ip_address && other.port == port &&
other.worker_id == worker_id;
}
/// The ip address of the worker.
const std::string ip_address;
/// The local port of the worker.
const int port;
/// The unique id of the worker.
const WorkerID worker_id;
};
typedef std::function<std::shared_ptr<CoreWorkerClientInterface>(const WorkerAddress &)>
ClientFactoryFn;