From fb8e3615d59067679fba9daa6840fff3a2d46bbc Mon Sep 17 00:00:00 2001 From: mehrdadn Date: Wed, 15 Jan 2020 20:05:02 -0800 Subject: [PATCH] Use Boost.Process instead of pid_t (#6510) * Use Boost.Process instead of pid_t This will let us handle child processes (mostly) uniformly across platforms. TODO: There is no SIGTERM on Windows; achieving something equivalent is fairly involved. --- BUILD.bazel | 1 + bazel/ray_deps_setup.bzl | 2 + python/ray/test_utils.py | 6 +- src/ray/raylet/node_manager.cc | 61 +++--- src/ray/raylet/worker.cc | 10 +- src/ray/raylet/worker.h | 15 +- src/ray/raylet/worker_pool.cc | 177 +++++++----------- src/ray/raylet/worker_pool.h | 26 ++- src/ray/raylet/worker_pool_test.cc | 100 ++++++---- src/ray/util/process.h | 123 ++++++++++++ ...oost-process-teminate-waitpid-nohang.patch | 14 ++ 11 files changed, 333 insertions(+), 202 deletions(-) create mode 100644 src/ray/util/process.h create mode 100644 thirdparty/patches/boost-process-teminate-waitpid-nohang.patch diff --git a/BUILD.bazel b/BUILD.bazel index df47cf0ae..2d99efd65 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -399,6 +399,7 @@ cc_library( ":stats_lib", ":worker_rpc", "@boost//:asio", + "@boost//:process", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/container:flat_hash_set", diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index e4770090d..60119d7fd 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -123,6 +123,8 @@ def ray_deps_setup(): # Backport Clang-Cl patch on Boost 1.69 to Boost <= 1.68: # https://lists.boost.org/Archives/boost/2018/09/243420.php "//thirdparty/patches:boost-type_traits-trivial_move.patch", + # Partially backport waitpid() patch on Boost 1.72 to Boost <= 1.68 + "//thirdparty/patches:boost-process-teminate-waitpid-nohang.patch", ], ) diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index aac2f44d7..cdd0ff6ba 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -39,7 +39,7 @@ def wait_for_pid_to_exit(pid, timeout=20): return time.sleep(0.1) raise RayTestTimeoutException( - "Timed out while waiting for process to exit.") + "Timed out while waiting for process {} to exit.".format(pid)) def wait_for_children_of_pid(pid, num_children=1, timeout=20): @@ -51,8 +51,8 @@ def wait_for_children_of_pid(pid, num_children=1, timeout=20): return time.sleep(0.1) raise RayTestTimeoutException( - "Timed out while waiting for process children to start " - "({}/{} started).".format(num_alive, num_children)) + "Timed out while waiting for process {} children to start " + "({}/{} started).".format(pid, num_alive, num_children)) def wait_for_children_of_pid_to_exit(pid, timeout=20): diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 976c35fa0..399154641 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -89,8 +89,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, object_manager_profile_timer_(io_service), initial_config_(config), local_available_resources_(config.resource_config), - worker_pool_(config.num_initial_workers, config.maximum_startup_concurrency, - gcs_client_, config.worker_commands), + worker_pool_(io_service, config.num_initial_workers, + config.maximum_startup_concurrency, gcs_client_, + config.worker_commands), scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, @@ -228,22 +229,23 @@ void NodeManager::HandleUnexpectedWorkerFailure( } void NodeManager::KillWorker(std::shared_ptr worker) { +#ifdef _WIN32 + // TODO(mehrdadn): Implement implement graceful process termination mechanism +#else // If we're just cleaning up a single worker, allow it some time to clean // up its state before force killing. The client socket will be closed // and the worker struct will be freed after the timeout. - kill(worker->Pid(), SIGTERM); + kill(worker->Process().get()->id(), SIGTERM); +#endif auto retry_timer = std::make_shared(io_service_); auto retry_duration = boost::posix_time::milliseconds( RayConfig::instance().kill_worker_timeout_milliseconds()); retry_timer->expires_from_now(retry_duration); retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) { - RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Pid(); - // Force kill worker. TODO(mehrdadn, rkn): The worker may have already died - // and had its PID reassigned to a different process, at least on Windows. - // On Linux, this may or may not be the case, depending on e.g. whether - // the process has been already waited on. Regardless, this must be fixed. - kill(worker->Pid(), SIGKILL); + RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Process().get()->id(); + // Force kill worker + worker->Process().get()->terminate(); }); } @@ -855,8 +857,9 @@ void NodeManager::ProcessClientMessage( RAY_LOG(DEBUG) << "[Worker] Message " << protocol::EnumNameMessageType(message_type_value) << "(" << message_type << ") from worker with PID " - << (registered_worker ? std::to_string(registered_worker->Pid()) - : "nil"); + << (registered_worker + ? std::to_string(registered_worker->Process().get()->id()) + : "nil"); if (registered_worker && registered_worker->IsDead()) { // For a worker that is marked as dead (because the job has died already), // all the messages are ignored except DisconnectClient. @@ -963,12 +966,6 @@ void NodeManager::ProcessClientMessage( void NodeManager::ProcessRegisterClientRequestMessage( const std::shared_ptr &client, const uint8_t *message_data) { client->Register(); - auto message = flatbuffers::GetRoot(message_data); - Language language = static_cast(message->language()); - WorkerID worker_id = from_flatbuf(*message->worker_id()); - auto worker = std::make_shared(worker_id, message->worker_pid(), language, - message->port(), client, client_call_manager_); - Status status; flatbuffers::FlatBufferBuilder fbb; auto reply = ray::protocol::CreateRegisterClientReply(fbb, to_flatbuf(fbb, self_node_id_)); @@ -983,24 +980,31 @@ void NodeManager::ProcessRegisterClientRequestMessage( } }); + auto message = flatbuffers::GetRoot(message_data); + Language language = static_cast(message->language()); + WorkerID worker_id = from_flatbuf(*message->worker_id()); + pid_t pid = message->worker_pid(); + auto worker = std::make_shared(worker_id, language, message->port(), client, + client_call_manager_); if (message->is_worker()) { // Register the new worker. - if (worker_pool_.RegisterWorker(worker).ok()) { + if (worker_pool_.RegisterWorker(worker, pid).ok()) { HandleWorkerAvailable(worker->Connection()); } } else { // Register the new driver. + worker->SetProcess(ProcessHandle::FromPid(pid)); const JobID job_id = from_flatbuf(*message->job_id()); // Compute a dummy driver task id from a given driver. const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id); worker->AssignTaskId(driver_task_id); worker->AssignJobId(job_id); - status = worker_pool_.RegisterDriver(worker); + Status status = worker_pool_.RegisterDriver(worker); if (status.ok()) { local_queues_.AddDriverTaskId(driver_task_id); - auto job_data_ptr = gcs::CreateJobTableData( - job_id, /*is_dead*/ false, std::time(nullptr), - initial_config_.node_manager_address, message->worker_pid()); + auto job_data_ptr = + gcs::CreateJobTableData(job_id, /*is_dead*/ false, std::time(nullptr), + initial_config_.node_manager_address, pid); RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr)); } } @@ -1196,7 +1200,8 @@ void NodeManager::ProcessDisconnectClientMessage( cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); worker->ResetLifetimeResourceIds(); - RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. " + RAY_LOG(DEBUG) << "Worker (pid=" << worker->Process().get()->id() + << ") is disconnected. " << "job_id: " << worker->GetAssignedJobId(); // Since some resources may have been released, we can try to dispatch more tasks. @@ -1210,7 +1215,8 @@ void NodeManager::ProcessDisconnectClientMessage( local_queues_.RemoveDriverTaskId(TaskID::ComputeDriverTaskId(driver_id)); worker_pool_.DisconnectDriver(worker); - RAY_LOG(DEBUG) << "Driver (pid=" << worker->Pid() << ") is disconnected. " + RAY_LOG(DEBUG) << "Driver (pid=" << worker->Process().get()->id() + << ") is disconnected. " << "job_id: " << job_id; } @@ -2290,7 +2296,8 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & } RAY_LOG(DEBUG) << "Assigning task " << spec.TaskId() << " to worker with pid " - << worker->Pid() << ", worker id: " << worker->WorkerId(); + << worker->Process().get()->id() + << ", worker id: " << worker->WorkerId(); flatbuffers::FlatBufferBuilder fbb; // Resource accounting: acquire resources for the assigned task. @@ -3076,7 +3083,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, rpc::SendReplyCallback send_reply_callback) { for (const auto &driver : worker_pool_.GetAllDrivers()) { auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(driver->Pid()); + worker_stats->set_pid(driver->Process().get()->id()); worker_stats->set_is_driver(true); } for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { @@ -3139,7 +3146,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, << status.ToString(); } else { auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(worker->Pid()); + worker_stats->set_pid(worker->Process().get()->id()); worker_stats->set_is_driver(false); reply->set_num_workers(reply->num_workers() + 1); worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index a02e65a78..ec850ee66 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -12,11 +12,10 @@ namespace ray { namespace raylet { /// A constructor responsible for initializing the state of a worker. -Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port, +Worker::Worker(const WorkerID &worker_id, const Language &language, int port, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager) : worker_id_(worker_id), - pid_(pid), language_(language), port_(port), connection_(connection), @@ -42,7 +41,12 @@ bool Worker::IsBlocked() const { return blocked_; } WorkerID Worker::WorkerId() const { return worker_id_; } -pid_t Worker::Pid() const { return pid_; } +ProcessHandle Worker::Process() const { return proc_; } + +void Worker::SetProcess(const ProcessHandle &proc) { + RAY_CHECK(!proc_); // this procedure should not be called multiple times + proc_ = proc; +} Language Worker::GetLanguage() const { return language_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index fde77f4a2..91721d0a3 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -9,8 +9,7 @@ #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" #include "ray/rpc/worker/core_worker_client.h" - -#include // pid_t +#include "ray/util/process.h" namespace ray { @@ -22,7 +21,8 @@ namespace raylet { class Worker { public: /// A constructor that initializes a worker object. - Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port, + /// NOTE: You MUST manually set the worker process. + Worker(const WorkerID &worker_id, const Language &language, int port, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager); /// A destructor responsible for freeing all worker state. @@ -34,8 +34,9 @@ class Worker { bool IsBlocked() const; /// Return the worker's ID. WorkerID WorkerId() const; - /// Return the worker's PID. - pid_t Pid() const; + /// Return the worker process. + ProcessHandle Process() const; + void SetProcess(const ProcessHandle &proc); Language GetLanguage() const; int Port() const; void AssignTaskId(const TaskID &task_id); @@ -79,8 +80,8 @@ class Worker { private: /// The worker's ID. WorkerID worker_id_; - /// The worker's PID. - pid_t pid_; + /// The worker's process. + ProcessHandle proc_; /// The language type of this worker. Language language_; /// Port that this worker listens on. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 220ead748..9fb922959 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1,14 +1,14 @@ #include "ray/raylet/worker_pool.h" -#ifdef _WIN32 -#include -#include -#endif - #include #include +#include +#include +#include +#include + #include "ray/common/constants.h" #include "ray/common/ray_config.h" #include "ray/common/status.h" @@ -46,19 +46,14 @@ namespace raylet { /// A constructor that initializes a worker pool with num_workers workers for /// each language. -WorkerPool::WorkerPool(int num_workers, int maximum_startup_concurrency, +WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, + int maximum_startup_concurrency, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands) - : maximum_startup_concurrency_(maximum_startup_concurrency), + : io_service_(&io_service), + maximum_startup_concurrency_(maximum_startup_concurrency), gcs_client_(std::move(gcs_client)) { RAY_CHECK(maximum_startup_concurrency > 0); -#ifdef _WIN32 - // TODO(mehrdadn): Is there an equivalent of this we need for Windows? -#else - // Ignore SIGCHLD signals. If we don't do this, then worker processes will - // become zombies instead of dying gracefully. - signal(SIGCHLD, SIG_IGN); -#endif for (const auto &entry : worker_commands) { // Initialize the pool state for this language. auto &state = states_by_lang_[entry.first]; @@ -100,25 +95,21 @@ void WorkerPool::Start(int num_workers) { } WorkerPool::~WorkerPool() { - std::unordered_set pids_to_kill; + std::unordered_set procs_to_kill; for (const auto &entry : states_by_lang_) { // Kill all registered workers. NOTE(swang): This assumes that the registered // workers were started by the pool. for (const auto &worker : entry.second.registered_workers) { - pids_to_kill.insert(worker->Pid()); + procs_to_kill.insert(worker->Process()); } // Kill all the workers that have been started but not registered. for (const auto &starting_worker : entry.second.starting_worker_processes) { - pids_to_kill.insert(starting_worker.first); + procs_to_kill.insert(starting_worker.first); } } - for (const auto &pid : pids_to_kill) { - RAY_CHECK(pid > 0); - kill(pid, SIGKILL); - } - // Waiting for the workers to be killed - for (const auto &pid : pids_to_kill) { - waitpid(pid, NULL, 0); + for (const auto &proc : procs_to_kill) { + proc.get()->terminate(); + proc.get()->wait(); } } @@ -132,8 +123,8 @@ uint32_t WorkerPool::Size(const Language &language) const { } } -int WorkerPool::StartWorkerProcess(const Language &language, - const std::vector &dynamic_options) { +ProcessHandle WorkerPool::StartWorkerProcess( + const Language &language, const std::vector &dynamic_options) { auto &state = GetStateForLanguage(language); // If we are already starting up too many workers, then return without starting // more. @@ -146,7 +137,7 @@ int WorkerPool::StartWorkerProcess(const Language &language, RAY_LOG(DEBUG) << "Worker not started, " << starting_workers << " workers of language type " << static_cast(language) << " pending registration"; - return -1; + return ProcessHandle(); } // Either there are no workers pending registration or the worker start is being forced. RAY_LOG(DEBUG) << "Starting new worker process, current pool has " @@ -194,71 +185,16 @@ int WorkerPool::StartWorkerProcess(const Language &language, << Language_Name(language) << " worker process. But the " << kWorkerNumWorkersPlaceholder << "placeholder is not found in worker command."; - pid_t pid = StartProcess(worker_command_args); - if (pid < 0) { - // Failure case. - RAY_LOG(FATAL) << "Failed to fork worker process: " << strerror(errno); - } else if (pid > 0) { - // Parent process case. - RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start - << " worker(s) with pid " << pid; - state.starting_worker_processes.emplace(pid, workers_to_start); - return pid; - } - return -1; + ProcessHandle proc = StartProcess(worker_command_args); + RAY_CHECK(proc); + RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start + << " worker(s) with pid " << proc.get()->id(); + state.starting_worker_processes.emplace(proc, workers_to_start); + return proc; } -#ifdef _WIN32 -// Fork + exec combo for Windows. Returns -1 on failure. -// TODO(mehrdadn): This is dangerous on Windows. -// We need to keep the actual process handle alive for the PID to stay valid. -// Make this change as soon as possible, or the PID may refer to the wrong process. -static pid_t spawnvp_wrapper(std::vector const &args) { - pid_t pid; - std::vector str_args; - for (const auto &arg : args) { - str_args.push_back(arg.c_str()); - } - str_args.push_back(NULL); - HANDLE handle = (HANDLE)spawnvp(P_NOWAIT, str_args[0], str_args.data()); - if (handle != INVALID_HANDLE_VALUE) { - pid = static_cast(GetProcessId(handle)); - if (pid == 0) { - pid = -1; - } - CloseHandle(handle); - } else { - pid = -1; - errno = EINVAL; - } - return pid; -} -#else -// Fork + exec combo for POSIX. Returns -1 on failure. -static pid_t spawnvp_wrapper(std::vector const &args) { - pid_t pid; - std::vector str_args; - for (const auto &arg : args) { - str_args.push_back(arg.c_str()); - } - str_args.push_back(NULL); - pid = fork(); - if (pid == 0) { - // Child process case. - // Reset the SIGCHLD handler for the worker. - // TODO(mehrdadn): Move any work here to the child process itself - // so that it can also be implemented on Windows. - signal(SIGCHLD, SIG_DFL); - if (execvp(str_args[0], const_cast(str_args.data())) == -1) { - pid = -1; - abort(); // fork() succeeded but exec() failed, so abort the child - } - } - return pid; -} -#endif - -pid_t WorkerPool::StartProcess(const std::vector &worker_command_args) { +ProcessHandle WorkerPool::StartProcess( + const std::vector &worker_command_args) { if (RAY_LOG_ENABLED(DEBUG)) { std::stringstream stream; stream << "Starting worker process with command:"; @@ -269,25 +205,35 @@ pid_t WorkerPool::StartProcess(const std::vector &worker_command_ar } // Launch the process to create the worker. - pid_t pid = spawnvp_wrapper(worker_command_args); - if (pid == -1) { - RAY_LOG(FATAL) << "Failed to start worker with error " << errno << ": " - << strerror(errno); + auto exit_callback = [=](int, const std::error_code &ec) { + // This callback seems to be necessary for proper zombie cleanup. + // However, it doesn't need to do anything. + }; + std::error_code ec; + ProcessHandle child( + std::make_shared(boost::process::args(worker_command_args), *io_service_, + boost::process::on_exit = exit_callback, ec)); + if (!child.get()->valid()) { + child = ProcessHandle(); } - return pid; + if (!child || !child.get()->valid() || ec) { + // The worker failed to start. This is a fatal error. + RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": " + << ec.message(); + } + return child; } -Status WorkerPool::RegisterWorker(const std::shared_ptr &worker) { - const auto pid = worker->Pid(); +Status WorkerPool::RegisterWorker(const std::shared_ptr &worker, pid_t pid) { const auto port = worker->Port(); RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port; auto &state = GetStateForLanguage(worker->GetLanguage()); - - auto it = state.starting_worker_processes.find(pid); + auto it = state.starting_worker_processes.find(ProcessHandle::FromPid(pid)); if (it == state.starting_worker_processes.end()) { RAY_LOG(WARNING) << "Received a register request from an unknown worker " << pid; return Status::Invalid("Unknown worker"); } + worker->SetProcess(it->first); it->second--; if (it->second == 0) { state.starting_worker_processes.erase(it); @@ -332,7 +278,7 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { << "Idle workers cannot have an assigned task ID"; auto &state = GetStateForLanguage(worker->GetLanguage()); - auto it = state.dedicated_workers_to_tasks.find(worker->Pid()); + auto it = state.dedicated_workers_to_tasks.find(worker->Process()); if (it != state.dedicated_workers_to_tasks.end()) { // The worker is used for the actor creation task with dynamic options. // Put it into idle dedicated worker pool. @@ -353,7 +299,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec auto &state = GetStateForLanguage(task_spec.GetLanguage()); std::shared_ptr worker = nullptr; - int pid = -1; + ProcessHandle proc; if (task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) { // Code path of actor creation task with dynamic worker options. // Try to pop it from idle dedicated pool. @@ -364,15 +310,16 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec state.idle_dedicated_workers.erase(it); // Because we found a worker that can perform this task, // we can remove it from dedicated_workers_to_tasks. - state.dedicated_workers_to_tasks.erase(worker->Pid()); + state.dedicated_workers_to_tasks.erase(worker->Process()); state.tasks_to_dedicated_workers.erase(task_spec.TaskId()); } else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) { // We are not pending a registration from a worker for this task, // so start a new worker process for this task. - pid = StartWorkerProcess(task_spec.GetLanguage(), task_spec.DynamicWorkerOptions()); - if (pid > 0) { - state.dedicated_workers_to_tasks[pid] = task_spec.TaskId(); - state.tasks_to_dedicated_workers[task_spec.TaskId()] = pid; + proc = + StartWorkerProcess(task_spec.GetLanguage(), task_spec.DynamicWorkerOptions()); + if (proc) { + state.dedicated_workers_to_tasks[proc] = task_spec.TaskId(); + state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc; } } } else if (!task_spec.IsActorTask()) { @@ -383,7 +330,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } else { // There are no more non-actor workers available to execute this task. // Start a new worker process. - pid = StartWorkerProcess(task_spec.GetLanguage()); + proc = StartWorkerProcess(task_spec.GetLanguage()); } } else { // Code path of actor task. @@ -395,7 +342,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } } - if (worker == nullptr && pid > 0) { + if (worker == nullptr && proc) { WarnAboutSize(); } @@ -408,7 +355,7 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { stats::CurrentWorker().Record( 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->Pid())}}); + {stats::WorkerPidKey, std::to_string(worker->Process().get()->id())}}); return RemoveWorker(state.idle, worker); } @@ -418,7 +365,7 @@ void WorkerPool::DisconnectDriver(const std::shared_ptr &driver) { RAY_CHECK(RemoveWorker(state.registered_drivers, driver)); stats::CurrentDriver().Record( 0, {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->Pid())}}); + {stats::WorkerPidKey, std::to_string(driver->Process().get()->id())}}); } inline WorkerPool::State &WorkerPool::GetStateForLanguage(const Language &language) { @@ -534,15 +481,17 @@ void WorkerPool::RecordMetrics() const { // Record worker. for (auto worker : entry.second.registered_workers) { stats::CurrentWorker().Record( - worker->Pid(), {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->Pid())}}); + worker->Process().get()->id(), + {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(worker->Process().get()->id())}}); } // Record driver. for (auto driver : entry.second.registered_drivers) { stats::CurrentDriver().Record( - driver->Pid(), {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->Pid())}}); + driver->Process().get()->id(), + {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(driver->Process().get()->id())}}); } } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 8af5b4f15..267c88647 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -5,6 +5,9 @@ #include #include #include + +#include + #include "gtest/gtest.h" #include "ray/common/client_connection.h" @@ -40,7 +43,8 @@ class WorkerPool { /// resources on the machine). /// \param worker_commands The commands used to start the worker process, grouped by /// language. - WorkerPool(int num_workers, int maximum_startup_concurrency, + WorkerPool(boost::asio::io_service &io_service, int num_workers, + int maximum_startup_concurrency, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands); @@ -52,7 +56,7 @@ class WorkerPool { /// /// \param The Worker to be registered. /// \return If the registration is successful. - Status RegisterWorker(const std::shared_ptr &worker); + Status RegisterWorker(const std::shared_ptr &worker, pid_t pid); /// Register a new driver. /// @@ -157,14 +161,16 @@ class WorkerPool { /// \param dynamic_options The dynamic options that we should add for worker command. /// \return The id of the process that we started if it's positive, /// otherwise it means we didn't start a process. - int StartWorkerProcess(const Language &language, - const std::vector &dynamic_options = {}); + ProcessHandle StartWorkerProcess(const Language &language, + const std::vector &dynamic_options = {}); /// The implementation of how to start a new worker process with command arguments. + /// The lifetime of the process is tied to that of the returned object, + /// unless the caller manually detaches the process after the call. /// /// \param worker_command_args The command arguments of new worker process. - /// \return The process ID of started worker process. - virtual pid_t StartProcess(const std::vector &worker_command_args); + /// \return An object representing the started worker process. + virtual ProcessHandle StartProcess(const std::vector &worker_command_args); /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); @@ -189,12 +195,12 @@ class WorkerPool { std::unordered_set> registered_drivers; /// A map from the pids of starting worker processes /// to the number of their unregistered workers. - std::unordered_map starting_worker_processes; + std::unordered_map starting_worker_processes; /// A map for looking up the task with dynamic options by the pid of /// worker. Note that this is used for the dedicated worker processes. - std::unordered_map dedicated_workers_to_tasks; + std::unordered_map dedicated_workers_to_tasks; /// A map for speeding up looking up the pending worker for the given task. - std::unordered_map tasks_to_dedicated_workers; + std::unordered_map tasks_to_dedicated_workers; /// We'll push a warning to the user every time a multiple of this many /// worker processes has been started. int multiple_for_warning; @@ -217,6 +223,8 @@ class WorkerPool { /// for a given language. State &GetStateForLanguage(const Language &language); + /// Required by Boost.Process for managing subprocesses (e.g. reaping zombies). + boost::asio::io_service *io_service_; /// The maximum number of worker processes that can be started concurrently. int maximum_startup_concurrency_; /// A client connection to the GCS. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index e5c2b7346..6ea279401 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -16,17 +16,19 @@ std::vector LANGUAGES = {Language::PYTHON, Language::JAVA}; class WorkerPoolMock : public WorkerPool { public: - WorkerPoolMock() + WorkerPoolMock(boost::asio::io_service &io_service) : WorkerPoolMock( + io_service, {{Language::PYTHON, {"dummy_py_worker_command", "--foo=RAY_WORKER_NUM_WORKERS_PLACEHOLDER"}}, {Language::JAVA, {"dummy_java_worker_command", "--foo=RAY_WORKER_NUM_WORKERS_PLACEHOLDER"}}}) {} - explicit WorkerPoolMock(const WorkerCommandMap &worker_commands) - : WorkerPool(0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands), - last_worker_pid_(0) { + explicit WorkerPoolMock(boost::asio::io_service &io_service, + const WorkerCommandMap &worker_commands) + : WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands), + last_worker_process_() { for (auto &entry : states_by_lang_) { entry.second.num_workers_per_process = NUM_WORKERS_PER_PROCESS; } @@ -37,23 +39,30 @@ class WorkerPoolMock : public WorkerPool { states_by_lang_.clear(); } - void StartWorkerProcess(const Language &language, - const std::vector &dynamic_options = {}) { - WorkerPool::StartWorkerProcess(language, dynamic_options); - } + using WorkerPool::StartWorkerProcess; // we need this to be public for testing - pid_t StartProcess(const std::vector &worker_command_args) override { - last_worker_pid_ += 1; - worker_commands_by_pid[last_worker_pid_] = worker_command_args; - return last_worker_pid_; + ProcessHandle StartProcess( + const std::vector &worker_command_args) override { +#ifndef PID_MAX_LIMIT + // This is defined by Linux to be the maximum allowable number of processes + // There's no guarantee for other OSes, but it's good enough for testing... + enum { PID_MAX_LIMIT = 1 << 22 }; +#endif + // Use a bogus process ID that won't conflict with those in the system + pid_t pid = static_cast(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size()); + Process proc(pid); + proc.detach(); + last_worker_process_ = std::make_shared(std::move(proc)); + worker_commands_by_proc_[last_worker_process_] = worker_command_args; + return last_worker_process_; } void WarnAboutSize() override {} - pid_t LastStartedWorkerProcess() const { return last_worker_pid_; } + ProcessHandle LastStartedWorkerProcess() const { return last_worker_process_; } - const std::vector &GetWorkerCommand(int pid) { - return worker_commands_by_pid[pid]; + const std::vector &GetWorkerCommand(ProcessHandle proc) { + return worker_commands_by_proc_[proc]; } int NumWorkersStarting() const { @@ -75,20 +84,19 @@ class WorkerPoolMock : public WorkerPool { } private: - int last_worker_pid_; - // The worker commands by pid. - std::unordered_map> worker_commands_by_pid; + ProcessHandle last_worker_process_; + // The worker commands by process. + std::unordered_map> worker_commands_by_proc_; }; class WorkerPoolTest : public ::testing::Test { public: WorkerPoolTest() - : worker_pool_(), - io_service_(), + : worker_pool_(io_service_), error_message_type_(1), client_call_manager_(io_service_) {} - std::shared_ptr CreateWorker(pid_t pid, + std::shared_ptr CreateWorker(ProcessHandle proc, const Language &language = Language::PYTHON) { std::function client_handler = [this](LocalClientConnection &client) { HandleNewClient(client); }; @@ -101,18 +109,22 @@ class WorkerPoolTest : public ::testing::Test { auto client = LocalClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); - return std::shared_ptr(new Worker(WorkerID::FromRandom(), pid, language, -1, - client, client_call_manager_)); + std::shared_ptr worker = std::make_shared( + WorkerID::FromRandom(), language, -1, client, client_call_manager_); + if (proc) { + worker->SetProcess(proc); + } + return worker; } void SetWorkerCommands(const WorkerCommandMap &worker_commands) { - WorkerPoolMock worker_pool(worker_commands); + WorkerPoolMock worker_pool(io_service_, worker_commands); this->worker_pool_ = std::move(worker_pool); } protected: - WorkerPoolMock worker_pool_; boost::asio::io_service io_service_; + WorkerPoolMock worker_pool_; int64_t error_message_type_; rpc::ClientCallManager client_call_manager_; @@ -142,12 +154,21 @@ static inline TaskSpecification ExampleTaskSpec( return TaskSpecification(std::move(message)); } +TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) { + typedef ProcessHandle T; + T a(std::make_shared()), b(std::make_shared()), empty = T(); + ASSERT_TRUE(std::equal_to()(a, a)); + ASSERT_TRUE(!std::equal_to()(a, b)); + ASSERT_TRUE(!std::equal_to()(b, a)); + ASSERT_TRUE(!std::equal_to()(empty, a)); + ASSERT_TRUE(!std::equal_to()(a, empty)); +} + TEST_F(WorkerPoolTest, HandleWorkerRegistration) { - worker_pool_.StartWorkerProcess(Language::PYTHON); - pid_t pid = worker_pool_.LastStartedWorkerProcess(); + ProcessHandle proc = worker_pool_.StartWorkerProcess(Language::PYTHON); std::vector> workers; for (int i = 0; i < NUM_WORKERS_PER_PROCESS; i++) { - workers.push_back(CreateWorker(pid)); + workers.push_back(CreateWorker(ProcessHandle())); } for (const auto &worker : workers) { // Check that there's still a starting worker process @@ -155,7 +176,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 1); // Check that we cannot lookup the worker before it's registered. ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); - RAY_CHECK_OK(worker_pool_.RegisterWorker(worker)); + RAY_CHECK_OK(worker_pool_.RegisterWorker(worker, proc.get()->id())); // Check that we can lookup the worker after it's registered. ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker); } @@ -181,20 +202,21 @@ TEST_F(WorkerPoolTest, StartupWorkerProcessCount) { ASSERT_TRUE(expected_worker_process_count < static_cast(desired_initial_worker_process_count_per_language * LANGUAGES.size())); - pid_t last_started_worker_process = 0; + ProcessHandle last_started_worker_process; for (int i = 0; i < desired_initial_worker_process_count_per_language; i++) { for (size_t j = 0; j < LANGUAGES.size(); j++) { worker_pool_.StartWorkerProcess(LANGUAGES[j]); ASSERT_TRUE(worker_pool_.NumWorkerProcessesStarting() <= expected_worker_process_count); - if (last_started_worker_process != worker_pool_.LastStartedWorkerProcess()) { - last_started_worker_process = worker_pool_.LastStartedWorkerProcess(); + ProcessHandle prev = worker_pool_.LastStartedWorkerProcess(); + if (last_started_worker_process.get() != prev.get()) { + last_started_worker_process = prev; const auto &real_command = worker_pool_.GetWorkerCommand(worker_pool_.LastStartedWorkerProcess()); ASSERT_EQ(real_command, worker_commands[j]); } else { - ASSERT_TRUE(worker_pool_.NumWorkerProcessesStarting() == - expected_worker_process_count); + ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), + expected_worker_process_count); ASSERT_TRUE(static_cast(i * LANGUAGES.size() + j) >= expected_worker_process_count); } @@ -224,8 +246,8 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { // Create some workers. std::unordered_set> workers; - workers.insert(CreateWorker(1234)); - workers.insert(CreateWorker(5678)); + workers.insert(CreateWorker(std::make_shared())); + workers.insert(CreateWorker(std::make_shared())); // Add the workers to the pool. for (auto &worker : workers) { worker_pool_.PushWorker(worker); @@ -244,7 +266,7 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { TEST_F(WorkerPoolTest, PopActorWorker) { // Create a worker. - auto worker = CreateWorker(1234); + auto worker = CreateWorker(std::make_shared()); // Add the worker to the pool. worker_pool_.PushWorker(worker); @@ -267,7 +289,7 @@ TEST_F(WorkerPoolTest, PopActorWorker) { TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { // Create a Python Worker, and add it to the pool - auto py_worker = CreateWorker(1234, Language::PYTHON); + auto py_worker = CreateWorker(std::make_shared(), Language::PYTHON); worker_pool_.PushWorker(py_worker); // Check that no worker will be popped if the given task is a Java task const auto java_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA); @@ -277,7 +299,7 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { ASSERT_NE(worker_pool_.PopWorker(py_task_spec), nullptr); // Create a Java Worker, and add it to the pool - auto java_worker = CreateWorker(1234, Language::JAVA); + auto java_worker = CreateWorker(std::make_shared(), Language::JAVA); worker_pool_.PushWorker(java_worker); // Check that the worker will be popped now for Java task ASSERT_NE(worker_pool_.PopWorker(java_task_spec), nullptr); diff --git a/src/ray/util/process.h b/src/ray/util/process.h new file mode 100644 index 000000000..c34571f53 --- /dev/null +++ b/src/ray/util/process.h @@ -0,0 +1,123 @@ +#ifndef RAY_UTIL_PROCESS_H +#define RAY_UTIL_PROCESS_H + +#ifdef __linux__ +#include +#include +#include +#endif + +#include +#include +#include +#include +#include +#include +#include + +// We only define operators required by the standard library (==, hash). +// We declare but avoid defining the rest so that they're not used by accident. + +namespace ray { + +typedef boost::process::pid_t pid_t; + +class Process : public boost::process::child { + protected: + class ProcessFD { + // This class makes a best-effort attempt to keep a PID alive. + // However, it cannot make any guarantees. + // The kernel might not even support this mechanism. + // See here: https://unix.stackexchange.com/a/181249 +#ifdef __linux__ + int fd_; +#endif + public: +#ifdef __linux__ + ~ProcessFD() { + if (fd_ != -1) { + ::close(fd_); + } + } + ProcessFD(pid_t pid) : fd_(-1) { + if (pid != -1) { + char path[64]; + sprintf(path, "/proc/%d/ns/pid", static_cast(pid)); + fd_ = ::open(path, O_RDONLY); + } + } + ProcessFD(ProcessFD &&other) : fd_(std::move(other.fd_)) { other.fd_ = -1; } + ProcessFD(const ProcessFD &other) : fd_(other.fd_ != -1 ? ::dup(other.fd_) : -1) {} + ProcessFD &operator=(ProcessFD other) { + using std::swap; + swap(fd_, other.fd_); + return *this; + } +#else + ProcessFD(pid_t) {} +#endif + }; + ProcessFD fd_; + + public: + template + explicit Process(T &&... args) + : boost::process::child(std::forward(args)...), + fd_(boost::process::child::id()) {} +}; + +/// A managed equivalent of a pid_t (to manage the lifetime of each process). +/// TODO(mehrdadn): This hasn't been a great design, but we play along to +/// minimize the changes needed for Windows compatibility. +/// (We used to represent a worker process by just its pid_t, which carries +/// no ownership/lifetime information.) +/// Once this code is running properly, refactor the data structures to create +/// a better ownership structure between the worker processes and the workers. +class ProcessHandle { + std::shared_ptr proc_; + + public: + ProcessHandle(const std::shared_ptr &proc = std::shared_ptr()) + : proc_(proc) {} + Process *get() const { return proc_.get(); } + explicit operator bool() const { return !!proc_; } + static ProcessHandle FromPid(pid_t pid) { + Process temp(pid); + temp.detach(); + return std::make_shared(std::move(temp)); + } +}; + +} // namespace ray + +// Define comparators for process handles: +// - Valid process objects must be distinguished by their IDs. +// - Invalid process objects must be distinguished by their addresses. +namespace std { + +template <> +struct equal_to { + bool operator()(const ray::ProcessHandle &x, const ray::ProcessHandle &y) const { + const ray::Process *a = x.get(), *b = y.get(); + // See explanation above + return a ? b ? a->valid() + ? b->valid() ? equal_to()(a->id(), b->id()) : false + : b->valid() ? false : equal_to()(a, b) + : false + : !b; + } +}; + +template <> +struct hash { + size_t operator()(const ray::ProcessHandle &value) const { + const ray::Process *p = value.get(); + // See explanation above + return p ? p->valid() ? hash()(p->id()) : hash()(p) + : size_t(); + } +}; + +} // namespace std + +#endif diff --git a/thirdparty/patches/boost-process-teminate-waitpid-nohang.patch b/thirdparty/patches/boost-process-teminate-waitpid-nohang.patch new file mode 100644 index 000000000..908dd34dc --- /dev/null +++ b/thirdparty/patches/boost-process-teminate-waitpid-nohang.patch @@ -0,0 +1,14 @@ +From 28126b3432433c025606a84474c7afb5dec88daf Mon Sep 17 00:00:00 2001 +From: Klemens David Morgenstern +Date: Sun, 12 May 2019 17:02:25 +0700 +Subject: [PATCH] osx fix + +--- +diff --git include/boost/process/detail/posix/terminate.hpp include/boost/process/detail/posix/terminate.hpp +index 84024a5..e1e5f33 100644 +--- boost/process/detail/posix/terminate.hpp ++++ boost/process/detail/posix/terminate.hpp +@@ -30,1 +30,1 @@ +- ::waitpid(p.pid, &status, 0); //just to clean it up ++ ::waitpid(p.pid, &status, WNOHANG); //just to clean it up +--