diff --git a/BUILD.bazel b/BUILD.bazel index 2d99efd65..df47cf0ae 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -399,7 +399,6 @@ cc_library( ":stats_lib", ":worker_rpc", "@boost//:asio", - "@boost//:process", "@com_github_jupp0r_prometheus_cpp//pull", "@com_google_absl//absl/base:core_headers", "@com_google_absl//absl/container:flat_hash_set", diff --git a/bazel/ray_deps_setup.bzl b/bazel/ray_deps_setup.bzl index 60119d7fd..e4770090d 100644 --- a/bazel/ray_deps_setup.bzl +++ b/bazel/ray_deps_setup.bzl @@ -123,8 +123,6 @@ def ray_deps_setup(): # Backport Clang-Cl patch on Boost 1.69 to Boost <= 1.68: # https://lists.boost.org/Archives/boost/2018/09/243420.php "//thirdparty/patches:boost-type_traits-trivial_move.patch", - # Partially backport waitpid() patch on Boost 1.72 to Boost <= 1.68 - "//thirdparty/patches:boost-process-teminate-waitpid-nohang.patch", ], ) diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index cdd0ff6ba..aac2f44d7 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -39,7 +39,7 @@ def wait_for_pid_to_exit(pid, timeout=20): return time.sleep(0.1) raise RayTestTimeoutException( - "Timed out while waiting for process {} to exit.".format(pid)) + "Timed out while waiting for process to exit.") def wait_for_children_of_pid(pid, num_children=1, timeout=20): @@ -51,8 +51,8 @@ def wait_for_children_of_pid(pid, num_children=1, timeout=20): return time.sleep(0.1) raise RayTestTimeoutException( - "Timed out while waiting for process {} children to start " - "({}/{} started).".format(pid, num_alive, num_children)) + "Timed out while waiting for process children to start " + "({}/{} started).".format(num_alive, num_children)) def wait_for_children_of_pid_to_exit(pid, timeout=20): diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 1ece8f99a..6f0908adc 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -89,9 +89,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, object_manager_profile_timer_(io_service), initial_config_(config), local_available_resources_(config.resource_config), - worker_pool_(io_service, config.num_initial_workers, - config.maximum_startup_concurrency, gcs_client_, - config.worker_commands), + worker_pool_(config.num_initial_workers, config.maximum_startup_concurrency, + gcs_client_, config.worker_commands), scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, @@ -229,23 +228,22 @@ void NodeManager::HandleUnexpectedWorkerFailure( } void NodeManager::KillWorker(std::shared_ptr worker) { -#ifdef _WIN32 - // TODO(mehrdadn): Implement implement graceful process termination mechanism -#else // If we're just cleaning up a single worker, allow it some time to clean // up its state before force killing. The client socket will be closed // and the worker struct will be freed after the timeout. - kill(worker->Process().get()->id(), SIGTERM); -#endif + kill(worker->Pid(), SIGTERM); auto retry_timer = std::make_shared(io_service_); auto retry_duration = boost::posix_time::milliseconds( RayConfig::instance().kill_worker_timeout_milliseconds()); retry_timer->expires_from_now(retry_duration); retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) { - RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Process().get()->id(); - // Force kill worker - worker->Process().get()->terminate(); + RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Pid(); + // Force kill worker. TODO(mehrdadn, rkn): The worker may have already died + // and had its PID reassigned to a different process, at least on Windows. + // On Linux, this may or may not be the case, depending on e.g. whether + // the process has been already waited on. Regardless, this must be fixed. + kill(worker->Pid(), SIGKILL); }); } @@ -857,9 +855,8 @@ void NodeManager::ProcessClientMessage( RAY_LOG(DEBUG) << "[Worker] Message " << protocol::EnumNameMessageType(message_type_value) << "(" << message_type << ") from worker with PID " - << (registered_worker - ? std::to_string(registered_worker->Process().get()->id()) - : "nil"); + << (registered_worker ? std::to_string(registered_worker->Pid()) + : "nil"); if (registered_worker && registered_worker->IsDead()) { // For a worker that is marked as dead (because the job has died already), // all the messages are ignored except DisconnectClient. @@ -966,6 +963,12 @@ void NodeManager::ProcessClientMessage( void NodeManager::ProcessRegisterClientRequestMessage( const std::shared_ptr &client, const uint8_t *message_data) { client->Register(); + auto message = flatbuffers::GetRoot(message_data); + Language language = static_cast(message->language()); + WorkerID worker_id = from_flatbuf(*message->worker_id()); + auto worker = std::make_shared(worker_id, message->worker_pid(), language, + message->port(), client, client_call_manager_); + Status status; flatbuffers::FlatBufferBuilder fbb; auto reply = ray::protocol::CreateRegisterClientReply(fbb, to_flatbuf(fbb, self_node_id_)); @@ -980,31 +983,24 @@ void NodeManager::ProcessRegisterClientRequestMessage( } }); - auto message = flatbuffers::GetRoot(message_data); - Language language = static_cast(message->language()); - WorkerID worker_id = from_flatbuf(*message->worker_id()); - pid_t pid = message->worker_pid(); - auto worker = std::make_shared(worker_id, language, message->port(), client, - client_call_manager_); if (message->is_worker()) { // Register the new worker. - if (worker_pool_.RegisterWorker(worker, pid).ok()) { + if (worker_pool_.RegisterWorker(worker).ok()) { HandleWorkerAvailable(worker->Connection()); } } else { // Register the new driver. - worker->SetProcess(ProcessHandle::FromPid(pid)); const JobID job_id = from_flatbuf(*message->job_id()); // Compute a dummy driver task id from a given driver. const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id); worker->AssignTaskId(driver_task_id); worker->AssignJobId(job_id); - Status status = worker_pool_.RegisterDriver(worker); + status = worker_pool_.RegisterDriver(worker); if (status.ok()) { local_queues_.AddDriverTaskId(driver_task_id); - auto job_data_ptr = - gcs::CreateJobTableData(job_id, /*is_dead*/ false, std::time(nullptr), - initial_config_.node_manager_address, pid); + auto job_data_ptr = gcs::CreateJobTableData( + job_id, /*is_dead*/ false, std::time(nullptr), + initial_config_.node_manager_address, message->worker_pid()); RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr)); } } @@ -1200,8 +1196,7 @@ void NodeManager::ProcessDisconnectClientMessage( cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); worker->ResetLifetimeResourceIds(); - RAY_LOG(DEBUG) << "Worker (pid=" << worker->Process().get()->id() - << ") is disconnected. " + RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. " << "job_id: " << worker->GetAssignedJobId(); // Since some resources may have been released, we can try to dispatch more tasks. @@ -1215,8 +1210,7 @@ void NodeManager::ProcessDisconnectClientMessage( local_queues_.RemoveDriverTaskId(TaskID::ComputeDriverTaskId(driver_id)); worker_pool_.DisconnectDriver(worker); - RAY_LOG(DEBUG) << "Driver (pid=" << worker->Process().get()->id() - << ") is disconnected. " + RAY_LOG(DEBUG) << "Driver (pid=" << worker->Pid() << ") is disconnected. " << "job_id: " << job_id; } @@ -2296,8 +2290,7 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & } RAY_LOG(DEBUG) << "Assigning task " << spec.TaskId() << " to worker with pid " - << worker->Process().get()->id() - << ", worker id: " << worker->WorkerId(); + << worker->Pid() << ", worker id: " << worker->WorkerId(); flatbuffers::FlatBufferBuilder fbb; // Resource accounting: acquire resources for the assigned task. @@ -3128,7 +3121,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, rpc::SendReplyCallback send_reply_callback) { for (const auto &driver : worker_pool_.GetAllDrivers()) { auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(driver->Process().get()->id()); + worker_stats->set_pid(driver->Pid()); worker_stats->set_is_driver(true); } for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { @@ -3191,7 +3184,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, << status.ToString(); } else { auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(worker->Process().get()->id()); + worker_stats->set_pid(worker->Pid()); worker_stats->set_is_driver(false); reply->set_num_workers(reply->num_workers() + 1); worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index ec850ee66..a02e65a78 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -12,10 +12,11 @@ namespace ray { namespace raylet { /// A constructor responsible for initializing the state of a worker. -Worker::Worker(const WorkerID &worker_id, const Language &language, int port, +Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager) : worker_id_(worker_id), + pid_(pid), language_(language), port_(port), connection_(connection), @@ -41,12 +42,7 @@ bool Worker::IsBlocked() const { return blocked_; } WorkerID Worker::WorkerId() const { return worker_id_; } -ProcessHandle Worker::Process() const { return proc_; } - -void Worker::SetProcess(const ProcessHandle &proc) { - RAY_CHECK(!proc_); // this procedure should not be called multiple times - proc_ = proc; -} +pid_t Worker::Pid() const { return pid_; } Language Worker::GetLanguage() const { return language_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 91721d0a3..fde77f4a2 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -9,7 +9,8 @@ #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" #include "ray/rpc/worker/core_worker_client.h" -#include "ray/util/process.h" + +#include // pid_t namespace ray { @@ -21,8 +22,7 @@ namespace raylet { class Worker { public: /// A constructor that initializes a worker object. - /// NOTE: You MUST manually set the worker process. - Worker(const WorkerID &worker_id, const Language &language, int port, + Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager); /// A destructor responsible for freeing all worker state. @@ -34,9 +34,8 @@ class Worker { bool IsBlocked() const; /// Return the worker's ID. WorkerID WorkerId() const; - /// Return the worker process. - ProcessHandle Process() const; - void SetProcess(const ProcessHandle &proc); + /// Return the worker's PID. + pid_t Pid() const; Language GetLanguage() const; int Port() const; void AssignTaskId(const TaskID &task_id); @@ -80,8 +79,8 @@ class Worker { private: /// The worker's ID. WorkerID worker_id_; - /// The worker's process. - ProcessHandle proc_; + /// The worker's PID. + pid_t pid_; /// The language type of this worker. Language language_; /// Port that this worker listens on. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index be4c33b5a..3d8b6c45f 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1,14 +1,14 @@ #include "ray/raylet/worker_pool.h" +#ifdef _WIN32 +#include +#include +#endif + #include #include -#include -#include -#include -#include - #include "ray/common/constants.h" #include "ray/common/ray_config.h" #include "ray/common/status.h" @@ -46,14 +46,19 @@ namespace raylet { /// A constructor that initializes a worker pool with num_workers workers for /// each language. -WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, - int maximum_startup_concurrency, +WorkerPool::WorkerPool(int num_workers, int maximum_startup_concurrency, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands) - : io_service_(&io_service), - maximum_startup_concurrency_(maximum_startup_concurrency), + : maximum_startup_concurrency_(maximum_startup_concurrency), gcs_client_(std::move(gcs_client)) { RAY_CHECK(maximum_startup_concurrency > 0); +#ifdef _WIN32 + // TODO(mehrdadn): Is there an equivalent of this we need for Windows? +#else + // Ignore SIGCHLD signals. If we don't do this, then worker processes will + // become zombies instead of dying gracefully. + signal(SIGCHLD, SIG_IGN); +#endif for (const auto &entry : worker_commands) { // Initialize the pool state for this language. auto &state = states_by_lang_[entry.first]; @@ -95,21 +100,25 @@ void WorkerPool::Start(int num_workers) { } WorkerPool::~WorkerPool() { - std::unordered_set procs_to_kill; + std::unordered_set pids_to_kill; for (const auto &entry : states_by_lang_) { // Kill all registered workers. NOTE(swang): This assumes that the registered // workers were started by the pool. for (const auto &worker : entry.second.registered_workers) { - procs_to_kill.insert(worker->Process()); + pids_to_kill.insert(worker->Pid()); } // Kill all the workers that have been started but not registered. for (const auto &starting_worker : entry.second.starting_worker_processes) { - procs_to_kill.insert(starting_worker.first); + pids_to_kill.insert(starting_worker.first); } } - for (const auto &proc : procs_to_kill) { - proc.get()->terminate(); - proc.get()->wait(); + for (const auto &pid : pids_to_kill) { + RAY_CHECK(pid > 0); + kill(pid, SIGKILL); + } + // Waiting for the workers to be killed + for (const auto &pid : pids_to_kill) { + waitpid(pid, NULL, 0); } } @@ -123,8 +132,8 @@ uint32_t WorkerPool::Size(const Language &language) const { } } -ProcessHandle WorkerPool::StartWorkerProcess( - const Language &language, const std::vector &dynamic_options) { +int WorkerPool::StartWorkerProcess(const Language &language, + const std::vector &dynamic_options) { auto &state = GetStateForLanguage(language); // If we are already starting up too many workers, then return without starting // more. @@ -137,7 +146,7 @@ ProcessHandle WorkerPool::StartWorkerProcess( RAY_LOG(DEBUG) << "Worker not started, " << starting_workers << " workers of language type " << static_cast(language) << " pending registration"; - return ProcessHandle(); + return -1; } // Either there are no workers pending registration or the worker start is being forced. RAY_LOG(DEBUG) << "Starting new worker process, current pool has " @@ -185,16 +194,71 @@ ProcessHandle WorkerPool::StartWorkerProcess( << Language_Name(language) << " worker process. But the " << kWorkerNumWorkersPlaceholder << "placeholder is not found in worker command."; - ProcessHandle proc = StartProcess(worker_command_args); - RAY_CHECK(proc); - RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start - << " worker(s) with pid " << proc.get()->id(); - state.starting_worker_processes.emplace(proc, workers_to_start); - return proc; + pid_t pid = StartProcess(worker_command_args); + if (pid < 0) { + // Failure case. + RAY_LOG(FATAL) << "Failed to fork worker process: " << strerror(errno); + } else if (pid > 0) { + // Parent process case. + RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start + << " worker(s) with pid " << pid; + state.starting_worker_processes.emplace(pid, workers_to_start); + return pid; + } + return -1; } -ProcessHandle WorkerPool::StartProcess( - const std::vector &worker_command_args) { +#ifdef _WIN32 +// Fork + exec combo for Windows. Returns -1 on failure. +// TODO(mehrdadn): This is dangerous on Windows. +// We need to keep the actual process handle alive for the PID to stay valid. +// Make this change as soon as possible, or the PID may refer to the wrong process. +static pid_t spawnvp_wrapper(std::vector const &args) { + pid_t pid; + std::vector str_args; + for (const auto &arg : args) { + str_args.push_back(arg.c_str()); + } + str_args.push_back(NULL); + HANDLE handle = (HANDLE)spawnvp(P_NOWAIT, str_args[0], str_args.data()); + if (handle != INVALID_HANDLE_VALUE) { + pid = static_cast(GetProcessId(handle)); + if (pid == 0) { + pid = -1; + } + CloseHandle(handle); + } else { + pid = -1; + errno = EINVAL; + } + return pid; +} +#else +// Fork + exec combo for POSIX. Returns -1 on failure. +static pid_t spawnvp_wrapper(std::vector const &args) { + pid_t pid; + std::vector str_args; + for (const auto &arg : args) { + str_args.push_back(arg.c_str()); + } + str_args.push_back(NULL); + pid = fork(); + if (pid == 0) { + // Child process case. + // Reset the SIGCHLD handler for the worker. + // TODO(mehrdadn): Move any work here to the child process itself + // so that it can also be implemented on Windows. + signal(SIGCHLD, SIG_DFL); + if (execvp(str_args[0], const_cast(str_args.data())) == -1) { + pid = -1; + abort(); // fork() succeeded but exec() failed, so abort the child + } + } + return pid; +} +#endif + +pid_t WorkerPool::StartProcess(const std::vector &worker_command_args) { if (RAY_LOG_ENABLED(DEBUG)) { std::stringstream stream; stream << "Starting worker process with command:"; @@ -205,35 +269,25 @@ ProcessHandle WorkerPool::StartProcess( } // Launch the process to create the worker. - auto exit_callback = [=](int, const std::error_code &ec) { - // This callback seems to be necessary for proper zombie cleanup. - // However, it doesn't need to do anything. - }; - std::error_code ec; - ProcessHandle child( - std::make_shared(boost::process::args(worker_command_args), *io_service_, - boost::process::on_exit = exit_callback, ec)); - if (!child.get()->valid()) { - child = ProcessHandle(); + pid_t pid = spawnvp_wrapper(worker_command_args); + if (pid == -1) { + RAY_LOG(FATAL) << "Failed to start worker with error " << errno << ": " + << strerror(errno); } - if (!child || !child.get()->valid() || ec) { - // The worker failed to start. This is a fatal error. - RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": " - << ec.message(); - } - return child; + return pid; } -Status WorkerPool::RegisterWorker(const std::shared_ptr &worker, pid_t pid) { +Status WorkerPool::RegisterWorker(const std::shared_ptr &worker) { + const auto pid = worker->Pid(); const auto port = worker->Port(); RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port; auto &state = GetStateForLanguage(worker->GetLanguage()); - auto it = state.starting_worker_processes.find(ProcessHandle::FromPid(pid)); + + auto it = state.starting_worker_processes.find(pid); if (it == state.starting_worker_processes.end()) { RAY_LOG(WARNING) << "Received a register request from an unknown worker " << pid; return Status::Invalid("Unknown worker"); } - worker->SetProcess(it->first); it->second--; if (it->second == 0) { state.starting_worker_processes.erase(it); @@ -278,7 +332,7 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { << "Idle workers cannot have an assigned task ID"; auto &state = GetStateForLanguage(worker->GetLanguage()); - auto it = state.dedicated_workers_to_tasks.find(worker->Process()); + auto it = state.dedicated_workers_to_tasks.find(worker->Pid()); if (it != state.dedicated_workers_to_tasks.end()) { // The worker is used for the actor creation task with dynamic options. // Put it into idle dedicated worker pool. @@ -299,7 +353,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec auto &state = GetStateForLanguage(task_spec.GetLanguage()); std::shared_ptr worker = nullptr; - ProcessHandle proc; + int pid = -1; if (task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) { // Code path of actor creation task with dynamic worker options. // Try to pop it from idle dedicated pool. @@ -310,16 +364,15 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec state.idle_dedicated_workers.erase(it); // Because we found a worker that can perform this task, // we can remove it from dedicated_workers_to_tasks. - state.dedicated_workers_to_tasks.erase(worker->Process()); + state.dedicated_workers_to_tasks.erase(worker->Pid()); state.tasks_to_dedicated_workers.erase(task_spec.TaskId()); } else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) { // We are not pending a registration from a worker for this task, // so start a new worker process for this task. - proc = - StartWorkerProcess(task_spec.GetLanguage(), task_spec.DynamicWorkerOptions()); - if (proc) { - state.dedicated_workers_to_tasks[proc] = task_spec.TaskId(); - state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc; + pid = StartWorkerProcess(task_spec.GetLanguage(), task_spec.DynamicWorkerOptions()); + if (pid > 0) { + state.dedicated_workers_to_tasks[pid] = task_spec.TaskId(); + state.tasks_to_dedicated_workers[task_spec.TaskId()] = pid; } } } else if (!task_spec.IsActorTask()) { @@ -330,7 +383,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } else { // There are no more non-actor workers available to execute this task. // Start a new worker process. - proc = StartWorkerProcess(task_spec.GetLanguage()); + pid = StartWorkerProcess(task_spec.GetLanguage()); } } else { // Code path of actor task. @@ -342,7 +395,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } } - if (worker == nullptr && proc) { + if (worker == nullptr && pid > 0) { WarnAboutSize(); } @@ -355,7 +408,7 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { stats::CurrentWorker().Record( 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->Process().get()->id())}}); + {stats::WorkerPidKey, std::to_string(worker->Pid())}}); return RemoveWorker(state.idle, worker); } @@ -365,7 +418,7 @@ void WorkerPool::DisconnectDriver(const std::shared_ptr &driver) { RAY_CHECK(RemoveWorker(state.registered_drivers, driver)); stats::CurrentDriver().Record( 0, {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->Process().get()->id())}}); + {stats::WorkerPidKey, std::to_string(driver->Pid())}}); } inline WorkerPool::State &WorkerPool::GetStateForLanguage(const Language &language) { @@ -481,17 +534,15 @@ void WorkerPool::RecordMetrics() const { // Record worker. for (auto worker : entry.second.registered_workers) { stats::CurrentWorker().Record( - worker->Process().get()->id(), - {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->Process().get()->id())}}); + worker->Pid(), {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(worker->Pid())}}); } // Record driver. for (auto driver : entry.second.registered_drivers) { stats::CurrentDriver().Record( - driver->Process().get()->id(), - {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->Process().get()->id())}}); + driver->Pid(), {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(driver->Pid())}}); } } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index aa749d035..711aaa1d2 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -5,9 +5,6 @@ #include #include #include - -#include - #include "gtest/gtest.h" #include "ray/common/client_connection.h" @@ -43,8 +40,8 @@ class WorkerPool { /// resources on the machine). /// \param worker_commands The commands used to start the worker process, grouped by /// language. - WorkerPool(boost::asio::io_service &io_service, int num_workers, - int maximum_startup_concurrency, std::shared_ptr gcs_client, + WorkerPool(int num_workers, int maximum_startup_concurrency, + std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands); /// Destructor responsible for freeing a set of workers owned by this class. @@ -55,7 +52,7 @@ class WorkerPool { /// /// \param The Worker to be registered. /// \return If the registration is successful. - Status RegisterWorker(const std::shared_ptr &worker, pid_t pid); + Status RegisterWorker(const std::shared_ptr &worker); /// Register a new driver. /// @@ -160,16 +157,14 @@ class WorkerPool { /// \param dynamic_options The dynamic options that we should add for worker command. /// \return The id of the process that we started if it's positive, /// otherwise it means we didn't start a process. - ProcessHandle StartWorkerProcess(const Language &language, - const std::vector &dynamic_options = {}); + int StartWorkerProcess(const Language &language, + const std::vector &dynamic_options = {}); /// The implementation of how to start a new worker process with command arguments. - /// The lifetime of the process is tied to that of the returned object, - /// unless the caller manually detaches the process after the call. /// /// \param worker_command_args The command arguments of new worker process. - /// \return An object representing the started worker process. - virtual ProcessHandle StartProcess(const std::vector &worker_command_args); + /// \return The process ID of started worker process. + virtual pid_t StartProcess(const std::vector &worker_command_args); /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); @@ -194,12 +189,12 @@ class WorkerPool { std::unordered_set> registered_drivers; /// A map from the pids of starting worker processes /// to the number of their unregistered workers. - std::unordered_map starting_worker_processes; + std::unordered_map starting_worker_processes; /// A map for looking up the task with dynamic options by the pid of /// worker. Note that this is used for the dedicated worker processes. - std::unordered_map dedicated_workers_to_tasks; + std::unordered_map dedicated_workers_to_tasks; /// A map for speeding up looking up the pending worker for the given task. - std::unordered_map tasks_to_dedicated_workers; + std::unordered_map tasks_to_dedicated_workers; /// We'll push a warning to the user every time a multiple of this many /// worker processes has been started. int multiple_for_warning; @@ -222,8 +217,6 @@ class WorkerPool { /// for a given language. State &GetStateForLanguage(const Language &language); - /// Required by Boost.Process for managing subprocesses (e.g. reaping zombies). - boost::asio::io_service *io_service_; /// The maximum number of worker processes that can be started concurrently. int maximum_startup_concurrency_; /// A client connection to the GCS. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 6ea279401..e5c2b7346 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -16,19 +16,17 @@ std::vector LANGUAGES = {Language::PYTHON, Language::JAVA}; class WorkerPoolMock : public WorkerPool { public: - WorkerPoolMock(boost::asio::io_service &io_service) + WorkerPoolMock() : WorkerPoolMock( - io_service, {{Language::PYTHON, {"dummy_py_worker_command", "--foo=RAY_WORKER_NUM_WORKERS_PLACEHOLDER"}}, {Language::JAVA, {"dummy_java_worker_command", "--foo=RAY_WORKER_NUM_WORKERS_PLACEHOLDER"}}}) {} - explicit WorkerPoolMock(boost::asio::io_service &io_service, - const WorkerCommandMap &worker_commands) - : WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands), - last_worker_process_() { + explicit WorkerPoolMock(const WorkerCommandMap &worker_commands) + : WorkerPool(0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands), + last_worker_pid_(0) { for (auto &entry : states_by_lang_) { entry.second.num_workers_per_process = NUM_WORKERS_PER_PROCESS; } @@ -39,30 +37,23 @@ class WorkerPoolMock : public WorkerPool { states_by_lang_.clear(); } - using WorkerPool::StartWorkerProcess; // we need this to be public for testing + void StartWorkerProcess(const Language &language, + const std::vector &dynamic_options = {}) { + WorkerPool::StartWorkerProcess(language, dynamic_options); + } - ProcessHandle StartProcess( - const std::vector &worker_command_args) override { -#ifndef PID_MAX_LIMIT - // This is defined by Linux to be the maximum allowable number of processes - // There's no guarantee for other OSes, but it's good enough for testing... - enum { PID_MAX_LIMIT = 1 << 22 }; -#endif - // Use a bogus process ID that won't conflict with those in the system - pid_t pid = static_cast(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size()); - Process proc(pid); - proc.detach(); - last_worker_process_ = std::make_shared(std::move(proc)); - worker_commands_by_proc_[last_worker_process_] = worker_command_args; - return last_worker_process_; + pid_t StartProcess(const std::vector &worker_command_args) override { + last_worker_pid_ += 1; + worker_commands_by_pid[last_worker_pid_] = worker_command_args; + return last_worker_pid_; } void WarnAboutSize() override {} - ProcessHandle LastStartedWorkerProcess() const { return last_worker_process_; } + pid_t LastStartedWorkerProcess() const { return last_worker_pid_; } - const std::vector &GetWorkerCommand(ProcessHandle proc) { - return worker_commands_by_proc_[proc]; + const std::vector &GetWorkerCommand(int pid) { + return worker_commands_by_pid[pid]; } int NumWorkersStarting() const { @@ -84,19 +75,20 @@ class WorkerPoolMock : public WorkerPool { } private: - ProcessHandle last_worker_process_; - // The worker commands by process. - std::unordered_map> worker_commands_by_proc_; + int last_worker_pid_; + // The worker commands by pid. + std::unordered_map> worker_commands_by_pid; }; class WorkerPoolTest : public ::testing::Test { public: WorkerPoolTest() - : worker_pool_(io_service_), + : worker_pool_(), + io_service_(), error_message_type_(1), client_call_manager_(io_service_) {} - std::shared_ptr CreateWorker(ProcessHandle proc, + std::shared_ptr CreateWorker(pid_t pid, const Language &language = Language::PYTHON) { std::function client_handler = [this](LocalClientConnection &client) { HandleNewClient(client); }; @@ -109,22 +101,18 @@ class WorkerPoolTest : public ::testing::Test { auto client = LocalClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); - std::shared_ptr worker = std::make_shared( - WorkerID::FromRandom(), language, -1, client, client_call_manager_); - if (proc) { - worker->SetProcess(proc); - } - return worker; + return std::shared_ptr(new Worker(WorkerID::FromRandom(), pid, language, -1, + client, client_call_manager_)); } void SetWorkerCommands(const WorkerCommandMap &worker_commands) { - WorkerPoolMock worker_pool(io_service_, worker_commands); + WorkerPoolMock worker_pool(worker_commands); this->worker_pool_ = std::move(worker_pool); } protected: - boost::asio::io_service io_service_; WorkerPoolMock worker_pool_; + boost::asio::io_service io_service_; int64_t error_message_type_; rpc::ClientCallManager client_call_manager_; @@ -154,21 +142,12 @@ static inline TaskSpecification ExampleTaskSpec( return TaskSpecification(std::move(message)); } -TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) { - typedef ProcessHandle T; - T a(std::make_shared()), b(std::make_shared()), empty = T(); - ASSERT_TRUE(std::equal_to()(a, a)); - ASSERT_TRUE(!std::equal_to()(a, b)); - ASSERT_TRUE(!std::equal_to()(b, a)); - ASSERT_TRUE(!std::equal_to()(empty, a)); - ASSERT_TRUE(!std::equal_to()(a, empty)); -} - TEST_F(WorkerPoolTest, HandleWorkerRegistration) { - ProcessHandle proc = worker_pool_.StartWorkerProcess(Language::PYTHON); + worker_pool_.StartWorkerProcess(Language::PYTHON); + pid_t pid = worker_pool_.LastStartedWorkerProcess(); std::vector> workers; for (int i = 0; i < NUM_WORKERS_PER_PROCESS; i++) { - workers.push_back(CreateWorker(ProcessHandle())); + workers.push_back(CreateWorker(pid)); } for (const auto &worker : workers) { // Check that there's still a starting worker process @@ -176,7 +155,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 1); // Check that we cannot lookup the worker before it's registered. ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); - RAY_CHECK_OK(worker_pool_.RegisterWorker(worker, proc.get()->id())); + RAY_CHECK_OK(worker_pool_.RegisterWorker(worker)); // Check that we can lookup the worker after it's registered. ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker); } @@ -202,21 +181,20 @@ TEST_F(WorkerPoolTest, StartupWorkerProcessCount) { ASSERT_TRUE(expected_worker_process_count < static_cast(desired_initial_worker_process_count_per_language * LANGUAGES.size())); - ProcessHandle last_started_worker_process; + pid_t last_started_worker_process = 0; for (int i = 0; i < desired_initial_worker_process_count_per_language; i++) { for (size_t j = 0; j < LANGUAGES.size(); j++) { worker_pool_.StartWorkerProcess(LANGUAGES[j]); ASSERT_TRUE(worker_pool_.NumWorkerProcessesStarting() <= expected_worker_process_count); - ProcessHandle prev = worker_pool_.LastStartedWorkerProcess(); - if (last_started_worker_process.get() != prev.get()) { - last_started_worker_process = prev; + if (last_started_worker_process != worker_pool_.LastStartedWorkerProcess()) { + last_started_worker_process = worker_pool_.LastStartedWorkerProcess(); const auto &real_command = worker_pool_.GetWorkerCommand(worker_pool_.LastStartedWorkerProcess()); ASSERT_EQ(real_command, worker_commands[j]); } else { - ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), - expected_worker_process_count); + ASSERT_TRUE(worker_pool_.NumWorkerProcessesStarting() == + expected_worker_process_count); ASSERT_TRUE(static_cast(i * LANGUAGES.size() + j) >= expected_worker_process_count); } @@ -246,8 +224,8 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { // Create some workers. std::unordered_set> workers; - workers.insert(CreateWorker(std::make_shared())); - workers.insert(CreateWorker(std::make_shared())); + workers.insert(CreateWorker(1234)); + workers.insert(CreateWorker(5678)); // Add the workers to the pool. for (auto &worker : workers) { worker_pool_.PushWorker(worker); @@ -266,7 +244,7 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { TEST_F(WorkerPoolTest, PopActorWorker) { // Create a worker. - auto worker = CreateWorker(std::make_shared()); + auto worker = CreateWorker(1234); // Add the worker to the pool. worker_pool_.PushWorker(worker); @@ -289,7 +267,7 @@ TEST_F(WorkerPoolTest, PopActorWorker) { TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { // Create a Python Worker, and add it to the pool - auto py_worker = CreateWorker(std::make_shared(), Language::PYTHON); + auto py_worker = CreateWorker(1234, Language::PYTHON); worker_pool_.PushWorker(py_worker); // Check that no worker will be popped if the given task is a Java task const auto java_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA); @@ -299,7 +277,7 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { ASSERT_NE(worker_pool_.PopWorker(py_task_spec), nullptr); // Create a Java Worker, and add it to the pool - auto java_worker = CreateWorker(std::make_shared(), Language::JAVA); + auto java_worker = CreateWorker(1234, Language::JAVA); worker_pool_.PushWorker(java_worker); // Check that the worker will be popped now for Java task ASSERT_NE(worker_pool_.PopWorker(java_task_spec), nullptr); diff --git a/src/ray/util/process.h b/src/ray/util/process.h deleted file mode 100644 index c34571f53..000000000 --- a/src/ray/util/process.h +++ /dev/null @@ -1,123 +0,0 @@ -#ifndef RAY_UTIL_PROCESS_H -#define RAY_UTIL_PROCESS_H - -#ifdef __linux__ -#include -#include -#include -#endif - -#include -#include -#include -#include -#include -#include -#include - -// We only define operators required by the standard library (==, hash). -// We declare but avoid defining the rest so that they're not used by accident. - -namespace ray { - -typedef boost::process::pid_t pid_t; - -class Process : public boost::process::child { - protected: - class ProcessFD { - // This class makes a best-effort attempt to keep a PID alive. - // However, it cannot make any guarantees. - // The kernel might not even support this mechanism. - // See here: https://unix.stackexchange.com/a/181249 -#ifdef __linux__ - int fd_; -#endif - public: -#ifdef __linux__ - ~ProcessFD() { - if (fd_ != -1) { - ::close(fd_); - } - } - ProcessFD(pid_t pid) : fd_(-1) { - if (pid != -1) { - char path[64]; - sprintf(path, "/proc/%d/ns/pid", static_cast(pid)); - fd_ = ::open(path, O_RDONLY); - } - } - ProcessFD(ProcessFD &&other) : fd_(std::move(other.fd_)) { other.fd_ = -1; } - ProcessFD(const ProcessFD &other) : fd_(other.fd_ != -1 ? ::dup(other.fd_) : -1) {} - ProcessFD &operator=(ProcessFD other) { - using std::swap; - swap(fd_, other.fd_); - return *this; - } -#else - ProcessFD(pid_t) {} -#endif - }; - ProcessFD fd_; - - public: - template - explicit Process(T &&... args) - : boost::process::child(std::forward(args)...), - fd_(boost::process::child::id()) {} -}; - -/// A managed equivalent of a pid_t (to manage the lifetime of each process). -/// TODO(mehrdadn): This hasn't been a great design, but we play along to -/// minimize the changes needed for Windows compatibility. -/// (We used to represent a worker process by just its pid_t, which carries -/// no ownership/lifetime information.) -/// Once this code is running properly, refactor the data structures to create -/// a better ownership structure between the worker processes and the workers. -class ProcessHandle { - std::shared_ptr proc_; - - public: - ProcessHandle(const std::shared_ptr &proc = std::shared_ptr()) - : proc_(proc) {} - Process *get() const { return proc_.get(); } - explicit operator bool() const { return !!proc_; } - static ProcessHandle FromPid(pid_t pid) { - Process temp(pid); - temp.detach(); - return std::make_shared(std::move(temp)); - } -}; - -} // namespace ray - -// Define comparators for process handles: -// - Valid process objects must be distinguished by their IDs. -// - Invalid process objects must be distinguished by their addresses. -namespace std { - -template <> -struct equal_to { - bool operator()(const ray::ProcessHandle &x, const ray::ProcessHandle &y) const { - const ray::Process *a = x.get(), *b = y.get(); - // See explanation above - return a ? b ? a->valid() - ? b->valid() ? equal_to()(a->id(), b->id()) : false - : b->valid() ? false : equal_to()(a, b) - : false - : !b; - } -}; - -template <> -struct hash { - size_t operator()(const ray::ProcessHandle &value) const { - const ray::Process *p = value.get(); - // See explanation above - return p ? p->valid() ? hash()(p->id()) : hash()(p) - : size_t(); - } -}; - -} // namespace std - -#endif diff --git a/thirdparty/patches/boost-process-teminate-waitpid-nohang.patch b/thirdparty/patches/boost-process-teminate-waitpid-nohang.patch deleted file mode 100644 index 908dd34dc..000000000 --- a/thirdparty/patches/boost-process-teminate-waitpid-nohang.patch +++ /dev/null @@ -1,14 +0,0 @@ -From 28126b3432433c025606a84474c7afb5dec88daf Mon Sep 17 00:00:00 2001 -From: Klemens David Morgenstern -Date: Sun, 12 May 2019 17:02:25 +0700 -Subject: [PATCH] osx fix - ---- -diff --git include/boost/process/detail/posix/terminate.hpp include/boost/process/detail/posix/terminate.hpp -index 84024a5..e1e5f33 100644 ---- boost/process/detail/posix/terminate.hpp -+++ boost/process/detail/posix/terminate.hpp -@@ -30,1 +30,1 @@ -- ::waitpid(p.pid, &status, 0); //just to clean it up -+ ::waitpid(p.pid, &status, WNOHANG); //just to clean it up ---