[Core] Multi-tenancy: Reject worker registration if job has finished (#10569)

This commit is contained in:
Kai Yang
2020-09-14 14:49:31 +08:00
committed by GitHub
parent 5bc2ba38fd
commit 4c03f7ca2f
9 changed files with 182 additions and 99 deletions
+31 -1
View File
@@ -9,7 +9,8 @@ import pytest
import ray
import ray.test_utils
from ray.core.generated import node_manager_pb2, node_manager_pb2_grpc
from ray.test_utils import wait_for_condition, run_string_as_driver_nonblocking
from ray.test_utils import (wait_for_condition, run_string_as_driver,
run_string_as_driver_nonblocking)
def get_num_workers():
@@ -211,5 +212,34 @@ def test_worker_capping_run_chained_tasks(shutdown_only):
assert get_num_workers() == 2
def test_worker_registration_failure_after_driver_exit(shutdown_only):
info = ray.init(num_cpus=1, _system_config={"enable_multi_tenancy": True})
driver_code = """
import ray
import time
ray.init(address="{}")
@ray.remote
def foo():
pass
[foo.remote() for _ in range(100)]
ray.shutdown()
""".format(info["redis_address"])
before = get_num_workers()
assert before == 1
run_string_as_driver(driver_code)
# wait for a while to let workers register
time.sleep(2)
wait_for_condition(lambda: get_num_workers() == before, timeout=10)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
+16 -6
View File
@@ -301,20 +301,30 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
// instead of crashing.
auto grpc_client = rpc::NodeManagerWorkerClient::make(
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
Status raylet_client_status;
ClientID local_raylet_id;
int assigned_port;
std::unordered_map<std::string, std::string> system_config;
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
options_.node_ip_address, &local_raylet_id, &assigned_port, &system_config,
options_.serialized_job_config));
options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port,
&system_config, options_.serialized_job_config));
if (!raylet_client_status.ok()) {
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
RAY_LOG(ERROR) << "Failed to register worker " << worker_id << " to Raylet. "
<< raylet_client_status;
if (options_.enable_logging) {
RayLog::ShutDownRayLog();
}
// Quit the process immediately.
_Exit(1);
}
connected_ = true;
RAY_CHECK(assigned_port != -1)
<< "Failed to allocate a port for the worker. Please specify a wider port range "
"using the '--min-worker-port' and '--max-worker-port' arguments to 'ray "
"start'.";
RAY_CHECK(assigned_port >= 0);
// NOTE(edoakes): any initialization depending on RayConfig must happen after this line.
RayConfig::instance().initialize(system_config);
+4
View File
@@ -150,6 +150,10 @@ table RegisterClientRequest {
}
table RegisterClientReply {
// Whether the registration succeeded.
success: bool;
// The reason of registration failure.
failure_reason: string;
// GCS ClientID of the local node manager.
raylet_id: string;
// Port that this worker should listen on.
+10 -3
View File
@@ -1260,7 +1260,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
auto worker = std::dynamic_pointer_cast<WorkerInterface>(std::make_shared<Worker>(
worker_id, language, worker_type, worker_ip_address, client, client_call_manager_));
auto send_reply_callback = [this, client](int assigned_port) {
auto send_reply_callback = [this, client](Status status, int assigned_port) {
flatbuffers::FlatBufferBuilder fbb;
std::vector<std::string> system_config_keys;
std::vector<std::string> system_config_values;
@@ -1269,7 +1269,8 @@ void NodeManager::ProcessRegisterClientRequestMessage(
system_config_values.push_back(kv.second);
}
auto reply = ray::protocol::CreateRegisterClientReply(
fbb, to_flatbuf(fbb, self_node_id_), assigned_port,
fbb, status.ok(), fbb.CreateString(status.ToString()),
to_flatbuf(fbb, self_node_id_), assigned_port,
string_vec_to_flatbuf(fbb, system_config_keys),
string_vec_to_flatbuf(fbb, system_config_values));
fbb.Finish(reply);
@@ -1285,7 +1286,13 @@ void NodeManager::ProcessRegisterClientRequestMessage(
if (worker_type == rpc::WorkerType::WORKER ||
worker_type == rpc::WorkerType::IO_WORKER) {
// Register the new worker.
RAY_UNUSED(worker_pool_.RegisterWorker(worker, pid, send_reply_callback));
auto status = worker_pool_.RegisterWorker(worker, pid, send_reply_callback);
if (!status.ok()) {
// If the worker failed to register to Raylet, trigger task dispatching here to
// allow new worker processes to be started (if capped by
// maximum_startup_concurrency).
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
} else {
// Register the new driver.
RAY_CHECK(pid >= 0);
+90 -79
View File
@@ -421,7 +421,6 @@ Status WorkerPool::GetNextFreePort(int *port) {
// Return to pool to check later.
free_ports_->push(*port);
}
*port = -1;
return Status::Invalid(
"No available ports. Please specify a wider port range using --min-worker-port and "
"--max-worker-port.");
@@ -444,115 +443,127 @@ void WorkerPool::HandleJobFinished(const JobID &job_id) {
Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker,
pid_t pid,
std::function<void(int)> send_reply_callback) {
std::function<void(Status, int)> send_reply_callback) {
RAY_CHECK(worker);
// The port that this worker's gRPC server should listen on. 0 if the worker
// should bind on a random port.
int port;
Status status;
auto &state = GetStateForLanguage(worker->GetLanguage());
auto it = state.starting_worker_processes.find(Process::FromPid(pid));
if (it == state.starting_worker_processes.end()) {
RAY_LOG(WARNING) << "Received a register request from an unknown worker " << pid;
// Return -1 to signal to the worker that registration failed.
port = -1;
status = Status::Invalid("Unknown worker");
} else {
RAY_RETURN_NOT_OK(GetNextFreePort(&port));
RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port
<< ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType());
worker->SetAssignedPort(port);
worker->SetProcess(it->first);
it->second--;
if (it->second == 0) {
state.starting_worker_processes.erase(it);
// We may have slots to start more workers now.
TryStartIOWorkers(worker->GetLanguage(), state);
Status status = Status::Invalid("Unknown worker");
send_reply_callback(status, /*port=*/0);
return status;
}
// The port that this worker's gRPC server should listen on. 0 if the worker
// should bind on a random port.
int port = 0;
Status status = GetNextFreePort(&port);
if (!status.ok()) {
send_reply_callback(status, /*port=*/0);
return status;
}
RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port
<< ", worker_type: " << rpc::WorkerType_Name(worker->GetWorkerType());
worker->SetAssignedPort(port);
worker->SetProcess(it->first);
it->second--;
if (it->second == 0) {
state.starting_worker_processes.erase(it);
// We may have slots to start more workers now.
TryStartIOWorkers(worker->GetLanguage(), state);
}
RAY_CHECK(worker->GetProcess().GetId() == pid);
state.registered_workers.insert(worker);
if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) {
state.registered_io_workers.insert(worker);
state.num_starting_io_workers--;
}
if (RayConfig::instance().enable_multi_tenancy()) {
auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid);
RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end());
auto job_id = dedicated_workers_it->second;
// If the job is finished, we don't allow new registrations.
if (!unfinished_jobs_.contains(job_id)) {
auto process = Process::FromPid(pid);
state.starting_worker_processes.erase(process);
Status status =
Status::Invalid("The job is not running anymore. Reject registration.");
send_reply_callback(status, /*port=*/0);
return status;
}
RAY_CHECK(worker->GetProcess().GetId() == pid);
state.registered_workers.insert(worker);
if (worker->GetWorkerType() == rpc::WorkerType::IO_WORKER) {
state.registered_io_workers.insert(worker);
state.num_starting_io_workers--;
}
worker->AssignJobId(job_id);
// We don't call state.worker_pids_to_assigned_jobs.erase(job_id) here
// because we allow multi-workers per worker process.
if (RayConfig::instance().enable_multi_tenancy()) {
auto dedicated_workers_it = state.worker_pids_to_assigned_jobs.find(pid);
RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end());
auto job_id = dedicated_workers_it->second;
worker->AssignJobId(job_id);
// We don't call state.worker_pids_to_assigned_jobs.erase(job_id) here
// because we allow multi-workers per worker process.
// This is a workaround to finish driver registration after all initial workers are
// registered to Raylet if and only if Raylet is started by a Python driver and the
// job config is not set in `ray.init(...)`.
if (first_job_ == job_id && worker->GetLanguage() == Language::PYTHON) {
if (++first_job_registered_python_worker_count_ ==
first_job_driver_wait_num_python_workers_) {
if (first_job_send_register_client_reply_to_driver_) {
first_job_send_register_client_reply_to_driver_();
first_job_send_register_client_reply_to_driver_ = nullptr;
}
// This is a workaround to finish driver registration after all initial workers are
// registered to Raylet if and only if Raylet is started by a Python driver and the
// job config is not set in `ray.init(...)`.
if (first_job_ == job_id && worker->GetLanguage() == Language::PYTHON) {
if (++first_job_registered_python_worker_count_ ==
first_job_driver_wait_num_python_workers_) {
if (first_job_send_register_client_reply_to_driver_) {
first_job_send_register_client_reply_to_driver_();
first_job_send_register_client_reply_to_driver_ = nullptr;
}
}
}
status = Status::OK();
}
// Send the reply immediately for worker registrations.
if (send_reply_callback) {
send_reply_callback(port);
}
return status;
send_reply_callback(Status::OK(), port);
return Status::OK();
}
Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver,
const JobID &job_id, const rpc::JobConfig &job_config,
std::function<void(int)> send_reply_callback) {
std::function<void(Status, int)> send_reply_callback) {
int port;
RAY_CHECK(!driver->GetAssignedTaskId().IsNil());
RAY_RETURN_NOT_OK(GetNextFreePort(&port));
Status status = GetNextFreePort(&port);
if (!status.ok()) {
send_reply_callback(status, /*port=*/0);
return status;
}
driver->SetAssignedPort(port);
auto &state = GetStateForLanguage(driver->GetLanguage());
state.registered_drivers.insert(std::move(driver));
driver->AssignJobId(job_id);
unfinished_jobs_[job_id] = job_config;
if (send_reply_callback) {
// This is a workaround to start initial workers on this node if and only if Raylet is
// started by a Python driver and the job config is not set in `ray.init(...)`.
// Invoke the `send_reply_callback` later to only finish driver
// registration after all initial workers are registered to Raylet.
bool delay_callback = false;
// Multi-tenancy is enabled.
if (RayConfig().instance().enable_multi_tenancy()) {
// If this is the first job.
if (first_job_.IsNil()) {
first_job_ = job_id;
// If the number of Python workers we need to wait is positive.
if (num_initial_python_workers_for_first_job_ > 0) {
delay_callback = true;
// Start initial Python workers for the first job.
for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) {
StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id);
}
// This is a workaround to start initial workers on this node if and only if Raylet is
// started by a Python driver and the job config is not set in `ray.init(...)`.
// Invoke the `send_reply_callback` later to only finish driver
// registration after all initial workers are registered to Raylet.
bool delay_callback = false;
// Multi-tenancy is enabled.
if (RayConfig().instance().enable_multi_tenancy()) {
// If this is the first job.
if (first_job_.IsNil()) {
first_job_ = job_id;
// If the number of Python workers we need to wait is positive.
if (num_initial_python_workers_for_first_job_ > 0) {
delay_callback = true;
// Start initial Python workers for the first job.
for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) {
StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id);
}
}
}
}
if (delay_callback) {
RAY_CHECK(!first_job_send_register_client_reply_to_driver_);
first_job_send_register_client_reply_to_driver_ = [send_reply_callback, port]() {
send_reply_callback(port);
};
} else {
send_reply_callback(port);
}
if (delay_callback) {
RAY_CHECK(!first_job_send_register_client_reply_to_driver_);
first_job_send_register_client_reply_to_driver_ = [send_reply_callback, port]() {
send_reply_callback(Status::OK(), port);
};
} else {
send_reply_callback(Status::OK(), port);
}
return Status::OK();
+2 -2
View File
@@ -122,7 +122,7 @@ class WorkerPool : public WorkerPoolInterface {
/// Returns 0 if the worker should bind on a random port.
/// \return If the registration is successful.
Status RegisterWorker(const std::shared_ptr<WorkerInterface> &worker, pid_t pid,
std::function<void(int)> send_reply_callback);
std::function<void(Status, int)> send_reply_callback);
/// Register a new driver.
///
@@ -134,7 +134,7 @@ class WorkerPool : public WorkerPoolInterface {
/// \return If the registration is successful.
Status RegisterDriver(const std::shared_ptr<WorkerInterface> &worker,
const JobID &job_id, const rpc::JobConfig &job_config,
std::function<void(int)> send_reply_callback);
std::function<void(Status, int)> send_reply_callback);
/// Get the client connection's registered worker.
///
+9 -2
View File
@@ -135,7 +135,8 @@ class WorkerPoolTest : public ::testing::TestWithParam<bool> {
const rpc::JobConfig &job_config = rpc::JobConfig()) {
auto driver = CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id);
driver->AssignTaskId(TaskID::ForDriverTask(job_id));
RAY_CHECK_OK(worker_pool_->RegisterDriver(driver, job_id, job_config, nullptr));
RAY_CHECK_OK(
worker_pool_->RegisterDriver(driver, job_id, job_config, [](Status, int) {}));
return driver;
}
@@ -246,7 +247,7 @@ TEST_P(WorkerPoolTest, HandleWorkerRegistration) {
ASSERT_EQ(worker_pool_->NumWorkerProcessesStarting(), 1);
// Check that we cannot lookup the worker before it's registered.
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), nullptr);
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), nullptr));
RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), [](Status, int) {}));
// Check that we can lookup the worker after it's registered.
ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker);
}
@@ -259,6 +260,12 @@ TEST_P(WorkerPoolTest, HandleWorkerRegistration) {
}
}
TEST_P(WorkerPoolTest, HandleUnknownWorkerRegistration) {
auto worker = CreateWorker(Process(), Language::PYTHON);
auto status = worker_pool_->RegisterWorker(worker, 1234, [](Status, int) {});
ASSERT_FALSE(status.ok());
}
TEST_P(WorkerPoolTest, StartupPythonWorkerProcessCount) {
TestStartupWorkerProcessCount(Language::PYTHON, 1, {"dummy_py_worker_command"});
}
+17 -4
View File
@@ -82,7 +82,7 @@ raylet::RayletClient::RayletClient(
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, ClientID *raylet_id, int *port,
const std::string &ip_address, Status *status, ClientID *raylet_id, int *port,
std::unordered_map<std::string, std::string> *system_config,
const std::string &job_config)
: grpc_client_(std::move(grpc_client)),
@@ -103,10 +103,23 @@ raylet::RayletClient::RayletClient(
// Register the process ID with the raylet.
// NOTE(swang): If raylet exits and we are registered as a worker, we will get killed.
std::vector<uint8_t> reply;
auto status = conn_->AtomicRequestReply(MessageType::RegisterClientRequest,
MessageType::RegisterClientReply, &reply, &fbb);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet.");
auto request_status = conn_->AtomicRequestReply(
MessageType::RegisterClientRequest, MessageType::RegisterClientReply, &reply, &fbb);
if (!request_status.ok()) {
*status =
Status(request_status.code(),
std::string("[RayletClient] Unable to register worker with raylet. ") +
request_status.message());
return;
}
auto reply_message = flatbuffers::GetRoot<protocol::RegisterClientReply>(reply.data());
bool success = reply_message->success();
if (success) {
*status = Status::OK();
} else {
*status = Status::Invalid(string_from_flatbuf(*reply_message->failure_reason()));
return;
}
*raylet_id = ClientID::FromBinary(reply_message->raylet_id()->str());
*port = reply_message->port();
+3 -2
View File
@@ -181,6 +181,7 @@ class RayletClient : public PinObjectsInterface,
/// \param job_id The ID of the driver. This is non-nil if the client is a driver.
/// \param language Language of the worker.
/// \param ip_address The IP address of the worker.
/// \param status This will be populated with the result of connection attempt.
/// \param raylet_id This will be populated with the local raylet's ClientID.
/// \param system_config This will be populated with internal config parameters
/// provided by the raylet.
@@ -190,8 +191,8 @@ class RayletClient : public PinObjectsInterface,
std::shared_ptr<ray::rpc::NodeManagerWorkerClient> grpc_client,
const std::string &raylet_socket, const WorkerID &worker_id,
rpc::WorkerType worker_type, const JobID &job_id, const Language &language,
const std::string &ip_address, ClientID *raylet_id, int *port,
std::unordered_map<std::string, std::string> *system_config,
const std::string &ip_address, Status *status, ClientID *raylet_id,
int *port, std::unordered_map<std::string, std::string> *system_config,
const std::string &job_config);
/// Connect to the raylet via grpc only.