diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 6d0788225..f107f4c7a 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -39,7 +39,7 @@ def wait_for_pid_to_exit(pid, timeout=20): return time.sleep(0.1) raise RayTestTimeoutException( - "Timed out while waiting for process to exit.") + "Timed out while waiting for process {} to exit.".format(pid)) def wait_for_children_of_pid(pid, num_children=1, timeout=20): @@ -51,8 +51,8 @@ def wait_for_children_of_pid(pid, num_children=1, timeout=20): return time.sleep(0.1) raise RayTestTimeoutException( - "Timed out while waiting for process children to start " - "({}/{} started).".format(num_alive, num_children)) + "Timed out while waiting for process {} children to start " + "({}/{} started).".format(pid, num_alive, num_children)) def wait_for_children_of_pid_to_exit(pid, timeout=20): diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9cd0743fa..c25f0a250 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -89,8 +89,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, object_manager_profile_timer_(io_service), initial_config_(config), local_available_resources_(config.resource_config), - worker_pool_(config.num_initial_workers, config.maximum_startup_concurrency, - gcs_client_, config.worker_commands), + worker_pool_(io_service, config.num_initial_workers, + config.maximum_startup_concurrency, gcs_client_, + config.worker_commands), scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, @@ -222,22 +223,23 @@ ray::Status NodeManager::RegisterGcs() { } void NodeManager::KillWorker(std::shared_ptr worker) { +#ifdef _WIN32 + // TODO(mehrdadn): Implement implement graceful process termination mechanism +#else // If we're just cleaning up a single worker, allow it some time to clean // up its state before force killing. The client socket will be closed // and the worker struct will be freed after the timeout. - kill(worker->Pid(), SIGTERM); + kill(worker->GetProcess().GetId(), SIGTERM); +#endif auto retry_timer = std::make_shared(io_service_); auto retry_duration = boost::posix_time::milliseconds( RayConfig::instance().kill_worker_timeout_milliseconds()); retry_timer->expires_from_now(retry_duration); retry_timer->async_wait([retry_timer, worker](const boost::system::error_code &error) { - RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->Pid(); - // Force kill worker. TODO(mehrdadn, rkn): The worker may have already died - // and had its PID reassigned to a different process, at least on Windows. - // On Linux, this may or may not be the case, depending on e.g. whether - // the process has been already waited on. Regardless, this must be fixed. - kill(worker->Pid(), SIGKILL); + RAY_LOG(DEBUG) << "Send SIGKILL to worker, pid=" << worker->GetProcess().GetId(); + // Force kill worker + worker->GetProcess().Kill(); }); } @@ -862,8 +864,9 @@ void NodeManager::ProcessClientMessage( RAY_LOG(DEBUG) << "[Worker] Message " << protocol::EnumNameMessageType(message_type_value) << "(" << message_type << ") from worker with PID " - << (registered_worker ? std::to_string(registered_worker->Pid()) - : "nil"); + << (registered_worker + ? std::to_string(registered_worker->GetProcess().GetId()) + : "nil"); if (registered_worker && registered_worker->IsDead()) { // For a worker that is marked as dead (because the job has died already), // all the messages are ignored except DisconnectClient. @@ -967,12 +970,6 @@ void NodeManager::ProcessClientMessage( void NodeManager::ProcessRegisterClientRequestMessage( const std::shared_ptr &client, const uint8_t *message_data) { client->Register(); - auto message = flatbuffers::GetRoot(message_data); - Language language = static_cast(message->language()); - WorkerID worker_id = from_flatbuf(*message->worker_id()); - auto worker = std::make_shared(worker_id, message->worker_pid(), language, - message->port(), client, client_call_manager_); - Status status; flatbuffers::FlatBufferBuilder fbb; auto reply = ray::protocol::CreateRegisterClientReply(fbb, to_flatbuf(fbb, self_node_id_)); @@ -987,24 +984,32 @@ void NodeManager::ProcessRegisterClientRequestMessage( } }); + auto message = flatbuffers::GetRoot(message_data); + Language language = static_cast(message->language()); + WorkerID worker_id = from_flatbuf(*message->worker_id()); + pid_t pid = message->worker_pid(); + auto worker = std::make_shared(worker_id, language, message->port(), client, + client_call_manager_); if (message->is_worker()) { // Register the new worker. - if (worker_pool_.RegisterWorker(worker).ok()) { + if (worker_pool_.RegisterWorker(worker, pid).ok()) { HandleWorkerAvailable(worker->Connection()); } } else { // Register the new driver. + RAY_CHECK(pid >= 0); + worker->SetProcess(Process::FromPid(pid)); const JobID job_id = from_flatbuf(*message->job_id()); // Compute a dummy driver task id from a given driver. const TaskID driver_task_id = TaskID::ComputeDriverTaskId(worker_id); worker->AssignTaskId(driver_task_id); worker->AssignJobId(job_id); - status = worker_pool_.RegisterDriver(worker); + Status status = worker_pool_.RegisterDriver(worker); if (status.ok()) { local_queues_.AddDriverTaskId(driver_task_id); - auto job_data_ptr = gcs::CreateJobTableData( - job_id, /*is_dead*/ false, std::time(nullptr), - initial_config_.node_manager_address, message->worker_pid()); + auto job_data_ptr = + gcs::CreateJobTableData(job_id, /*is_dead*/ false, std::time(nullptr), + initial_config_.node_manager_address, pid); RAY_CHECK_OK(gcs_client_->Jobs().AsyncAdd(job_data_ptr, nullptr)); } } @@ -1200,7 +1205,8 @@ void NodeManager::ProcessDisconnectClientMessage( cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); worker->ResetLifetimeResourceIds(); - RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. " + RAY_LOG(DEBUG) << "Worker (pid=" << worker->GetProcess().GetId() + << ") is disconnected. " << "job_id: " << worker->GetAssignedJobId(); // Since some resources may have been released, we can try to dispatch more tasks. @@ -1214,7 +1220,8 @@ void NodeManager::ProcessDisconnectClientMessage( local_queues_.RemoveDriverTaskId(TaskID::ComputeDriverTaskId(driver_id)); worker_pool_.DisconnectDriver(worker); - RAY_LOG(DEBUG) << "Driver (pid=" << worker->Pid() << ") is disconnected. " + RAY_LOG(DEBUG) << "Driver (pid=" << worker->GetProcess().GetId() + << ") is disconnected. " << "job_id: " << job_id; } @@ -2286,7 +2293,7 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & } RAY_LOG(DEBUG) << "Assigning task " << spec.TaskId() << " to worker with pid " - << worker->Pid() << ", worker id: " << worker->WorkerId(); + << worker->GetProcess().GetId() << ", worker id: " << worker->WorkerId(); flatbuffers::FlatBufferBuilder fbb; // Resource accounting: acquire resources for the assigned task. @@ -3137,7 +3144,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, rpc::SendReplyCallback send_reply_callback) { for (const auto &driver : worker_pool_.GetAllDrivers()) { auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(driver->Pid()); + worker_stats->set_pid(driver->GetProcess().GetId()); worker_stats->set_is_driver(true); } // NOTE(sang): Currently reporting only infeasible/ready ActorCreationTask @@ -3215,7 +3222,7 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request, << status.ToString(); } else { auto worker_stats = reply->add_workers_stats(); - worker_stats->set_pid(worker->Pid()); + worker_stats->set_pid(worker->GetProcess().GetId()); worker_stats->set_is_driver(false); reply->set_num_workers(reply->num_workers() + 1); worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats()); diff --git a/src/ray/raylet/worker.cc b/src/ray/raylet/worker.cc index 961afeba1..f3d2622e6 100644 --- a/src/ray/raylet/worker.cc +++ b/src/ray/raylet/worker.cc @@ -12,11 +12,10 @@ namespace ray { namespace raylet { /// A constructor responsible for initializing the state of a worker. -Worker::Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port, +Worker::Worker(const WorkerID &worker_id, const Language &language, int port, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager) : worker_id_(worker_id), - pid_(pid), language_(language), port_(port), connection_(connection), @@ -42,7 +41,12 @@ bool Worker::IsBlocked() const { return blocked_; } WorkerID Worker::WorkerId() const { return worker_id_; } -pid_t Worker::Pid() const { return pid_; } +Process Worker::GetProcess() const { return proc_; } + +void Worker::SetProcess(Process proc) { + RAY_CHECK(proc_.IsNull()); // this procedure should not be called multiple times + proc_ = std::move(proc); +} Language Worker::GetLanguage() const { return language_; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index 19ae2d867..cf5a13548 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -9,8 +9,7 @@ #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" #include "ray/rpc/worker/core_worker_client.h" - -#include // pid_t +#include "ray/util/process.h" namespace ray { @@ -22,7 +21,8 @@ namespace raylet { class Worker { public: /// A constructor that initializes a worker object. - Worker(const WorkerID &worker_id, pid_t pid, const Language &language, int port, + /// NOTE: You MUST manually set the worker process. + Worker(const WorkerID &worker_id, const Language &language, int port, std::shared_ptr connection, rpc::ClientCallManager &client_call_manager); /// A destructor responsible for freeing all worker state. @@ -34,8 +34,9 @@ class Worker { bool IsBlocked() const; /// Return the worker's ID. WorkerID WorkerId() const; - /// Return the worker's PID. - pid_t Pid() const; + /// Return the worker process. + Process GetProcess() const; + void SetProcess(Process proc); Language GetLanguage() const; int Port() const; void AssignTaskId(const TaskID &task_id); @@ -81,8 +82,8 @@ class Worker { private: /// The worker's ID. WorkerID worker_id_; - /// The worker's PID. - pid_t pid_; + /// The worker's process. + Process proc_; /// The language type of this worker. Language language_; /// Port that this worker listens on. diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 8858282bb..470140136 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1,10 +1,5 @@ #include "ray/raylet/worker_pool.h" -#ifdef _WIN32 -#include -#include -#endif - #include #include @@ -46,15 +41,15 @@ namespace raylet { /// A constructor that initializes a worker pool with num_workers workers for /// each language. -WorkerPool::WorkerPool(int num_workers, int maximum_startup_concurrency, +WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers, + int maximum_startup_concurrency, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands) - : maximum_startup_concurrency_(maximum_startup_concurrency), + : io_service_(&io_service), + maximum_startup_concurrency_(maximum_startup_concurrency), gcs_client_(std::move(gcs_client)) { RAY_CHECK(maximum_startup_concurrency > 0); -#ifdef _WIN32 - // TODO(mehrdadn): Is there an equivalent of this we need for Windows? -#else +#ifndef _WIN32 // Ignore SIGCHLD signals. If we don't do this, then worker processes will // become zombies instead of dying gracefully. signal(SIGCHLD, SIG_IGN); @@ -100,25 +95,23 @@ void WorkerPool::Start(int num_workers) { } WorkerPool::~WorkerPool() { - std::unordered_set pids_to_kill; + std::unordered_set procs_to_kill; for (const auto &entry : states_by_lang_) { // Kill all registered workers. NOTE(swang): This assumes that the registered // workers were started by the pool. for (const auto &worker : entry.second.registered_workers) { - pids_to_kill.insert(worker->Pid()); + procs_to_kill.insert(worker->GetProcess()); } // Kill all the workers that have been started but not registered. for (const auto &starting_worker : entry.second.starting_worker_processes) { - pids_to_kill.insert(starting_worker.first); + procs_to_kill.insert(starting_worker.first); } } - for (const auto &pid : pids_to_kill) { - RAY_CHECK(pid > 0); - kill(pid, SIGKILL); + for (Process proc : procs_to_kill) { + proc.Kill(); } - // Waiting for the workers to be killed - for (const auto &pid : pids_to_kill) { - waitpid(pid, NULL, 0); + for (Process proc : procs_to_kill) { + proc.Wait(); } } @@ -132,8 +125,8 @@ uint32_t WorkerPool::Size(const Language &language) const { } } -int WorkerPool::StartWorkerProcess(const Language &language, - const std::vector &dynamic_options) { +Process WorkerPool::StartWorkerProcess(const Language &language, + const std::vector &dynamic_options) { auto &state = GetStateForLanguage(language); // If we are already starting up too many workers, then return without starting // more. @@ -146,7 +139,7 @@ int WorkerPool::StartWorkerProcess(const Language &language, RAY_LOG(DEBUG) << "Worker not started, " << starting_workers << " workers of language type " << static_cast(language) << " pending registration"; - return -1; + return Process(); } // Either there are no workers pending registration or the worker start is being forced. RAY_LOG(DEBUG) << "Starting new worker process, current pool has " @@ -194,71 +187,14 @@ int WorkerPool::StartWorkerProcess(const Language &language, << Language_Name(language) << " worker process. But the " << kWorkerNumWorkersPlaceholder << "placeholder is not found in worker command."; - pid_t pid = StartProcess(worker_command_args); - if (pid < 0) { - // Failure case. - RAY_LOG(FATAL) << "Failed to fork worker process: " << strerror(errno); - } else if (pid > 0) { - // Parent process case. - RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start - << " worker(s) with pid " << pid; - state.starting_worker_processes.emplace(pid, workers_to_start); - return pid; - } - return -1; + Process proc = StartProcess(worker_command_args); + RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start + << " worker(s) with pid " << proc.GetId(); + state.starting_worker_processes.emplace(proc, workers_to_start); + return proc; } -#ifdef _WIN32 -// Fork + exec combo for Windows. Returns -1 on failure. -// TODO(mehrdadn): This is dangerous on Windows. -// We need to keep the actual process handle alive for the PID to stay valid. -// Make this change as soon as possible, or the PID may refer to the wrong process. -static pid_t spawnvp_wrapper(std::vector const &args) { - pid_t pid; - std::vector str_args; - for (const auto &arg : args) { - str_args.push_back(arg.c_str()); - } - str_args.push_back(NULL); - HANDLE handle = (HANDLE)spawnvp(P_NOWAIT, str_args[0], str_args.data()); - if (handle != INVALID_HANDLE_VALUE) { - pid = static_cast(GetProcessId(handle)); - if (pid == 0) { - pid = -1; - } - CloseHandle(handle); - } else { - pid = -1; - errno = EINVAL; - } - return pid; -} -#else -// Fork + exec combo for POSIX. Returns -1 on failure. -static pid_t spawnvp_wrapper(std::vector const &args) { - pid_t pid; - std::vector str_args; - for (const auto &arg : args) { - str_args.push_back(arg.c_str()); - } - str_args.push_back(NULL); - pid = fork(); - if (pid == 0) { - // Child process case. - // Reset the SIGCHLD handler for the worker. - // TODO(mehrdadn): Move any work here to the child process itself - // so that it can also be implemented on Windows. - signal(SIGCHLD, SIG_DFL); - if (execvp(str_args[0], const_cast(str_args.data())) == -1) { - pid = -1; - abort(); // fork() succeeded but exec() failed, so abort the child - } - } - return pid; -} -#endif - -pid_t WorkerPool::StartProcess(const std::vector &worker_command_args) { +Process WorkerPool::StartProcess(const std::vector &worker_command_args) { if (RAY_LOG_ENABLED(DEBUG)) { std::stringstream stream; stream << "Starting worker process with command:"; @@ -269,25 +205,31 @@ pid_t WorkerPool::StartProcess(const std::vector &worker_command_ar } // Launch the process to create the worker. - pid_t pid = spawnvp_wrapper(worker_command_args); - if (pid == -1) { - RAY_LOG(FATAL) << "Failed to start worker with error " << errno << ": " - << strerror(errno); + std::error_code ec; + std::vector argv; + for (const std::string &arg : worker_command_args) { + argv.push_back(arg.c_str()); } - return pid; + argv.push_back(NULL); + Process child(argv.data(), io_service_, ec); + if (!child.IsValid() || ec) { + // The worker failed to start. This is a fatal error. + RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": " + << ec.message(); + } + return child; } -Status WorkerPool::RegisterWorker(const std::shared_ptr &worker) { - const auto pid = worker->Pid(); +Status WorkerPool::RegisterWorker(const std::shared_ptr &worker, pid_t pid) { const auto port = worker->Port(); RAY_LOG(DEBUG) << "Registering worker with pid " << pid << ", port: " << port; auto &state = GetStateForLanguage(worker->GetLanguage()); - - auto it = state.starting_worker_processes.find(pid); + auto it = state.starting_worker_processes.find(Process::FromPid(pid)); if (it == state.starting_worker_processes.end()) { RAY_LOG(WARNING) << "Received a register request from an unknown worker " << pid; return Status::Invalid("Unknown worker"); } + worker->SetProcess(it->first); it->second--; if (it->second == 0) { state.starting_worker_processes.erase(it); @@ -332,7 +274,7 @@ void WorkerPool::PushWorker(const std::shared_ptr &worker) { << "Idle workers cannot have an assigned task ID"; auto &state = GetStateForLanguage(worker->GetLanguage()); - auto it = state.dedicated_workers_to_tasks.find(worker->Pid()); + auto it = state.dedicated_workers_to_tasks.find(worker->GetProcess()); if (it != state.dedicated_workers_to_tasks.end()) { // The worker is used for the actor creation task with dynamic options. // Put it into idle dedicated worker pool. @@ -353,7 +295,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec auto &state = GetStateForLanguage(task_spec.GetLanguage()); std::shared_ptr worker = nullptr; - int pid = -1; + Process proc; if (task_spec.IsActorCreationTask() && !task_spec.DynamicWorkerOptions().empty()) { // Code path of actor creation task with dynamic worker options. // Try to pop it from idle dedicated pool. @@ -364,15 +306,16 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec state.idle_dedicated_workers.erase(it); // Because we found a worker that can perform this task, // we can remove it from dedicated_workers_to_tasks. - state.dedicated_workers_to_tasks.erase(worker->Pid()); + state.dedicated_workers_to_tasks.erase(worker->GetProcess()); state.tasks_to_dedicated_workers.erase(task_spec.TaskId()); } else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) { // We are not pending a registration from a worker for this task, // so start a new worker process for this task. - pid = StartWorkerProcess(task_spec.GetLanguage(), task_spec.DynamicWorkerOptions()); - if (pid > 0) { - state.dedicated_workers_to_tasks[pid] = task_spec.TaskId(); - state.tasks_to_dedicated_workers[task_spec.TaskId()] = pid; + proc = + StartWorkerProcess(task_spec.GetLanguage(), task_spec.DynamicWorkerOptions()); + if (proc.IsValid()) { + state.dedicated_workers_to_tasks[proc] = task_spec.TaskId(); + state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc; } } } else if (!task_spec.IsActorTask()) { @@ -383,7 +326,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } else { // There are no more non-actor workers available to execute this task. // Start a new worker process. - pid = StartWorkerProcess(task_spec.GetLanguage()); + proc = StartWorkerProcess(task_spec.GetLanguage()); } } else { // Code path of actor task. @@ -395,7 +338,7 @@ std::shared_ptr WorkerPool::PopWorker(const TaskSpecification &task_spec } } - if (worker == nullptr && pid > 0) { + if (worker == nullptr && proc.IsValid()) { WarnAboutSize(); } @@ -408,7 +351,7 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker) { stats::CurrentWorker().Record( 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->Pid())}}); + {stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}}); return RemoveWorker(state.idle, worker); } @@ -418,7 +361,7 @@ void WorkerPool::DisconnectDriver(const std::shared_ptr &driver) { RAY_CHECK(RemoveWorker(state.registered_drivers, driver)); stats::CurrentDriver().Record( 0, {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->Pid())}}); + {stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}}); } inline WorkerPool::State &WorkerPool::GetStateForLanguage(const Language &language) { @@ -519,15 +462,17 @@ void WorkerPool::RecordMetrics() const { // Record worker. for (auto worker : entry.second.registered_workers) { stats::CurrentWorker().Record( - worker->Pid(), {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->Pid())}}); + worker->GetProcess().GetId(), + {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}}); } // Record driver. for (auto driver : entry.second.registered_drivers) { stats::CurrentDriver().Record( - driver->Pid(), {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->Pid())}}); + driver->GetProcess().GetId(), + {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, + {stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}}); } } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 711aaa1d2..2435c07c7 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -2,11 +2,13 @@ #define RAY_RAYLET_WORKER_POOL_H #include + +#include #include #include #include -#include "gtest/gtest.h" +#include "gtest/gtest.h" #include "ray/common/client_connection.h" #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" @@ -40,8 +42,8 @@ class WorkerPool { /// resources on the machine). /// \param worker_commands The commands used to start the worker process, grouped by /// language. - WorkerPool(int num_workers, int maximum_startup_concurrency, - std::shared_ptr gcs_client, + WorkerPool(boost::asio::io_service &io_service, int num_workers, + int maximum_startup_concurrency, std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands); /// Destructor responsible for freeing a set of workers owned by this class. @@ -52,7 +54,7 @@ class WorkerPool { /// /// \param The Worker to be registered. /// \return If the registration is successful. - Status RegisterWorker(const std::shared_ptr &worker); + Status RegisterWorker(const std::shared_ptr &worker, pid_t pid); /// Register a new driver. /// @@ -157,14 +159,16 @@ class WorkerPool { /// \param dynamic_options The dynamic options that we should add for worker command. /// \return The id of the process that we started if it's positive, /// otherwise it means we didn't start a process. - int StartWorkerProcess(const Language &language, - const std::vector &dynamic_options = {}); + Process StartWorkerProcess(const Language &language, + const std::vector &dynamic_options = {}); /// The implementation of how to start a new worker process with command arguments. + /// The lifetime of the process is tied to that of the returned object, + /// unless the caller manually detaches the process after the call. /// /// \param worker_command_args The command arguments of new worker process. - /// \return The process ID of started worker process. - virtual pid_t StartProcess(const std::vector &worker_command_args); + /// \return An object representing the started worker process. + virtual Process StartProcess(const std::vector &worker_command_args); /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); @@ -189,12 +193,12 @@ class WorkerPool { std::unordered_set> registered_drivers; /// A map from the pids of starting worker processes /// to the number of their unregistered workers. - std::unordered_map starting_worker_processes; + std::unordered_map starting_worker_processes; /// A map for looking up the task with dynamic options by the pid of /// worker. Note that this is used for the dedicated worker processes. - std::unordered_map dedicated_workers_to_tasks; + std::unordered_map dedicated_workers_to_tasks; /// A map for speeding up looking up the pending worker for the given task. - std::unordered_map tasks_to_dedicated_workers; + std::unordered_map tasks_to_dedicated_workers; /// We'll push a warning to the user every time a multiple of this many /// worker processes has been started. int multiple_for_warning; @@ -217,6 +221,8 @@ class WorkerPool { /// for a given language. State &GetStateForLanguage(const Language &language); + /// For Process class for managing subprocesses (e.g. reaping zombies). + boost::asio::io_service *io_service_; /// The maximum number of worker processes that can be started concurrently. int maximum_startup_concurrency_; /// A client connection to the GCS. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index e5c2b7346..a615d0016 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -1,9 +1,10 @@ +#include "ray/raylet/worker_pool.h" + #include "gmock/gmock.h" #include "gtest/gtest.h" - #include "ray/common/constants.h" #include "ray/raylet/node_manager.h" -#include "ray/raylet/worker_pool.h" +#include "ray/util/process.h" namespace ray { @@ -16,17 +17,19 @@ std::vector LANGUAGES = {Language::PYTHON, Language::JAVA}; class WorkerPoolMock : public WorkerPool { public: - WorkerPoolMock() + WorkerPoolMock(boost::asio::io_service &io_service) : WorkerPoolMock( + io_service, {{Language::PYTHON, {"dummy_py_worker_command", "--foo=RAY_WORKER_NUM_WORKERS_PLACEHOLDER"}}, {Language::JAVA, {"dummy_java_worker_command", "--foo=RAY_WORKER_NUM_WORKERS_PLACEHOLDER"}}}) {} - explicit WorkerPoolMock(const WorkerCommandMap &worker_commands) - : WorkerPool(0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands), - last_worker_pid_(0) { + explicit WorkerPoolMock(boost::asio::io_service &io_service, + const WorkerCommandMap &worker_commands) + : WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands), + last_worker_process_() { for (auto &entry : states_by_lang_) { entry.second.num_workers_per_process = NUM_WORKERS_PER_PROCESS; } @@ -37,23 +40,22 @@ class WorkerPoolMock : public WorkerPool { states_by_lang_.clear(); } - void StartWorkerProcess(const Language &language, - const std::vector &dynamic_options = {}) { - WorkerPool::StartWorkerProcess(language, dynamic_options); - } + using WorkerPool::StartWorkerProcess; // we need this to be public for testing - pid_t StartProcess(const std::vector &worker_command_args) override { - last_worker_pid_ += 1; - worker_commands_by_pid[last_worker_pid_] = worker_command_args; - return last_worker_pid_; + Process StartProcess(const std::vector &worker_command_args) override { + // Use a bogus process ID that won't conflict with those in the system + pid_t pid = static_cast(PID_MAX_LIMIT + 1 + worker_commands_by_proc_.size()); + last_worker_process_ = Process::FromPid(pid); + worker_commands_by_proc_[last_worker_process_] = worker_command_args; + return last_worker_process_; } void WarnAboutSize() override {} - pid_t LastStartedWorkerProcess() const { return last_worker_pid_; } + Process LastStartedWorkerProcess() const { return last_worker_process_; } - const std::vector &GetWorkerCommand(int pid) { - return worker_commands_by_pid[pid]; + const std::vector &GetWorkerCommand(Process proc) { + return worker_commands_by_proc_[proc]; } int NumWorkersStarting() const { @@ -75,20 +77,19 @@ class WorkerPoolMock : public WorkerPool { } private: - int last_worker_pid_; - // The worker commands by pid. - std::unordered_map> worker_commands_by_pid; + Process last_worker_process_; + // The worker commands by process. + std::unordered_map> worker_commands_by_proc_; }; class WorkerPoolTest : public ::testing::Test { public: WorkerPoolTest() - : worker_pool_(), - io_service_(), + : worker_pool_(io_service_), error_message_type_(1), client_call_manager_(io_service_) {} - std::shared_ptr CreateWorker(pid_t pid, + std::shared_ptr CreateWorker(Process proc, const Language &language = Language::PYTHON) { std::function client_handler = [this](LocalClientConnection &client) { HandleNewClient(client); }; @@ -101,18 +102,22 @@ class WorkerPoolTest : public ::testing::Test { auto client = LocalClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); - return std::shared_ptr(new Worker(WorkerID::FromRandom(), pid, language, -1, - client, client_call_manager_)); + std::shared_ptr worker = std::make_shared( + WorkerID::FromRandom(), language, -1, client, client_call_manager_); + if (!proc.IsNull()) { + worker->SetProcess(proc); + } + return worker; } void SetWorkerCommands(const WorkerCommandMap &worker_commands) { - WorkerPoolMock worker_pool(worker_commands); + WorkerPoolMock worker_pool(io_service_, worker_commands); this->worker_pool_ = std::move(worker_pool); } protected: - WorkerPoolMock worker_pool_; boost::asio::io_service io_service_; + WorkerPoolMock worker_pool_; int64_t error_message_type_; rpc::ClientCallManager client_call_manager_; @@ -142,12 +147,25 @@ static inline TaskSpecification ExampleTaskSpec( return TaskSpecification(std::move(message)); } +TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) { + typedef Process T; + T a(T::CreateNewDummy()), b(T::CreateNewDummy()), empty = T(); + ASSERT_TRUE(empty.IsNull()); + ASSERT_TRUE(!empty.IsValid()); + ASSERT_TRUE(!a.IsNull()); + ASSERT_TRUE(!a.IsValid()); // a dummy process is not a valid process! + ASSERT_TRUE(std::equal_to()(a, a)); + ASSERT_TRUE(!std::equal_to()(a, b)); + ASSERT_TRUE(!std::equal_to()(b, a)); + ASSERT_TRUE(!std::equal_to()(empty, a)); + ASSERT_TRUE(!std::equal_to()(a, empty)); +} + TEST_F(WorkerPoolTest, HandleWorkerRegistration) { - worker_pool_.StartWorkerProcess(Language::PYTHON); - pid_t pid = worker_pool_.LastStartedWorkerProcess(); + Process proc = worker_pool_.StartWorkerProcess(Language::PYTHON); std::vector> workers; for (int i = 0; i < NUM_WORKERS_PER_PROCESS; i++) { - workers.push_back(CreateWorker(pid)); + workers.push_back(CreateWorker(Process())); } for (const auto &worker : workers) { // Check that there's still a starting worker process @@ -155,7 +173,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), 1); // Check that we cannot lookup the worker before it's registered. ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), nullptr); - RAY_CHECK_OK(worker_pool_.RegisterWorker(worker)); + RAY_CHECK_OK(worker_pool_.RegisterWorker(worker, proc.GetId())); // Check that we can lookup the worker after it's registered. ASSERT_EQ(worker_pool_.GetRegisteredWorker(worker->Connection()), worker); } @@ -181,20 +199,21 @@ TEST_F(WorkerPoolTest, StartupWorkerProcessCount) { ASSERT_TRUE(expected_worker_process_count < static_cast(desired_initial_worker_process_count_per_language * LANGUAGES.size())); - pid_t last_started_worker_process = 0; + Process last_started_worker_process; for (int i = 0; i < desired_initial_worker_process_count_per_language; i++) { for (size_t j = 0; j < LANGUAGES.size(); j++) { worker_pool_.StartWorkerProcess(LANGUAGES[j]); ASSERT_TRUE(worker_pool_.NumWorkerProcessesStarting() <= expected_worker_process_count); - if (last_started_worker_process != worker_pool_.LastStartedWorkerProcess()) { - last_started_worker_process = worker_pool_.LastStartedWorkerProcess(); + Process prev = worker_pool_.LastStartedWorkerProcess(); + if (!std::equal_to()(last_started_worker_process, prev)) { + last_started_worker_process = prev; const auto &real_command = worker_pool_.GetWorkerCommand(worker_pool_.LastStartedWorkerProcess()); ASSERT_EQ(real_command, worker_commands[j]); } else { - ASSERT_TRUE(worker_pool_.NumWorkerProcessesStarting() == - expected_worker_process_count); + ASSERT_EQ(worker_pool_.NumWorkerProcessesStarting(), + expected_worker_process_count); ASSERT_TRUE(static_cast(i * LANGUAGES.size() + j) >= expected_worker_process_count); } @@ -224,8 +243,8 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { // Create some workers. std::unordered_set> workers; - workers.insert(CreateWorker(1234)); - workers.insert(CreateWorker(5678)); + workers.insert(CreateWorker(Process::CreateNewDummy())); + workers.insert(CreateWorker(Process::CreateNewDummy())); // Add the workers to the pool. for (auto &worker : workers) { worker_pool_.PushWorker(worker); @@ -244,7 +263,7 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) { TEST_F(WorkerPoolTest, PopActorWorker) { // Create a worker. - auto worker = CreateWorker(1234); + auto worker = CreateWorker(Process::CreateNewDummy()); // Add the worker to the pool. worker_pool_.PushWorker(worker); @@ -267,7 +286,7 @@ TEST_F(WorkerPoolTest, PopActorWorker) { TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { // Create a Python Worker, and add it to the pool - auto py_worker = CreateWorker(1234, Language::PYTHON); + auto py_worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON); worker_pool_.PushWorker(py_worker); // Check that no worker will be popped if the given task is a Java task const auto java_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA); @@ -277,7 +296,7 @@ TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { ASSERT_NE(worker_pool_.PopWorker(py_task_spec), nullptr); // Create a Java Worker, and add it to the pool - auto java_worker = CreateWorker(1234, Language::JAVA); + auto java_worker = CreateWorker(Process::CreateNewDummy(), Language::JAVA); worker_pool_.PushWorker(java_worker); // Check that the worker will be popped now for Java task ASSERT_NE(worker_pool_.PopWorker(java_task_spec), nullptr); diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc new file mode 100644 index 000000000..48ffc4194 --- /dev/null +++ b/src/ray/util/process.cc @@ -0,0 +1,319 @@ +#include "ray/util/process.h" + +#ifdef _WIN32 +#include +#else +#include +#include +#include +#endif +#include + +#include +#include +#include + +#include "ray/util/logging.h" + +namespace ray { + +class ProcessFD { + pid_t pid_; + intptr_t fd_; + + public: + ~ProcessFD(); + ProcessFD(); + ProcessFD(pid_t pid, intptr_t fd = -1); + ProcessFD(const ProcessFD &other); + ProcessFD(ProcessFD &&other); + ProcessFD &operator=(const ProcessFD &other); + ProcessFD &operator=(ProcessFD &&other); + intptr_t CloneFD() const; + void CloseFD(); + intptr_t GetFD() const; + pid_t GetId() const; + + // Fork + exec combo. Returns -1 for the PID on failure. + static ProcessFD spawnvp(const char *argv[], std::error_code &ec) { + ec = std::error_code(); + intptr_t fd; + pid_t pid; +#ifdef _WIN32 + fd = _spawnvp(P_NOWAIT, argv[0], argv); + if (fd != -1) { + pid = static_cast(GetProcessId(reinterpret_cast(fd))); + if (pid == 0) { + pid = -1; + } + } else { + pid = -1; + } + if (pid == -1) { + ec = std::error_code(GetLastError(), std::system_category()); + } +#else + // TODO(mehrdadn): Use clone() on Linux or posix_spawnp() on Mac to avoid duplicating + // file descriptors into the child process, as that can be problematic. + pid = fork(); + if (pid == 0) { + // Child process case. Reset the SIGCHLD handler for the worker. + signal(SIGCHLD, SIG_DFL); + if (execvp(argv[0], const_cast(argv)) == -1) { + pid = -1; + abort(); // fork() succeeded but exec() failed, so abort the child + } + } + if (pid == -1) { + ec = std::error_code(errno, std::system_category()); + } + // TODO(mehrdadn): This would be a good place to open a descriptor later + fd = -1; +#endif + return ProcessFD(pid, fd); + } +}; + +ProcessFD::~ProcessFD() { + if (fd_ != -1) { +#ifdef _WIN32 + CloseHandle(reinterpret_cast(fd_)); +#else + close(static_cast(fd_)); +#endif + } +} + +ProcessFD::ProcessFD() : pid_(-1), fd_(-1) {} + +ProcessFD::ProcessFD(pid_t pid, intptr_t fd) : pid_(pid), fd_(fd) { + if (pid != -1) { + bool process_does_not_exist = false; + std::error_code error; +#ifdef _WIN32 + if (fd == -1) { + BOOL inheritable = FALSE; + DWORD permissions = MAXIMUM_ALLOWED; + HANDLE handle = OpenProcess(permissions, inheritable, static_cast(pid)); + if (handle) { + fd_ = reinterpret_cast(handle); + } else { + DWORD error_code = GetLastError(); + error = std::error_code(error_code, std::system_category()); + if (error_code == ERROR_INVALID_PARAMETER) { + process_does_not_exist = true; + } + } + } else { + RAY_CHECK(pid == GetProcessId(reinterpret_cast(fd))); + } +#else + if (kill(pid, 0) == -1 && errno == ESRCH) { + process_does_not_exist = true; + } +#endif + // Don't verify anything if the PID is too high, since that's used for testing + if (pid < PID_MAX_LIMIT) { + if (process_does_not_exist) { + // NOTE: This indicates a race condition where a process died and its process + // table entry was removed before the ProcessFD could be instantiated. For + // processes owned by this process, we should make this impossible by keeping + // the SIGCHLD signal. For processes not owned by this process, we need to come up + // with a strategy to create this class in a way that avoids race conditions. + RAY_LOG(ERROR) << "Process " << pid << " does not exist."; + } + if (error) { + // TODO(mehrdadn): Should this be fatal, or perhaps returned as an error code? + // Failures might occur due to reasons such as permission issues. + RAY_LOG(ERROR) << "error " << error << " opening process " << pid << ": " + << error.message(); + } + } + } +} + +ProcessFD::ProcessFD(const ProcessFD &other) : ProcessFD(other.pid_, other.CloneFD()) {} + +ProcessFD::ProcessFD(ProcessFD &&other) : ProcessFD() { *this = std::move(other); } + +ProcessFD &ProcessFD::operator=(const ProcessFD &other) { + if (this != &other) { + // Construct a copy, then call the move constructor + *this = static_cast(other); + } + return *this; +} + +ProcessFD &ProcessFD::operator=(ProcessFD &&other) { + if (this != &other) { + // We use swap() to make sure the argument is actually moved from + using std::swap; + swap(pid_, other.pid_); + swap(fd_, other.fd_); + } + return *this; +} + +intptr_t ProcessFD::CloneFD() const { + intptr_t fd; + if (fd_ != -1) { +#ifdef _WIN32 + HANDLE handle; + BOOL inheritable = FALSE; + fd = DuplicateHandle(GetCurrentProcess(), reinterpret_cast(fd_), + GetCurrentProcess(), &handle, 0, inheritable, + DUPLICATE_SAME_ACCESS) + ? reinterpret_cast(handle) + : -1; +#else + fd = dup(static_cast(fd_)); +#endif + RAY_DCHECK(fd != -1); + } else { + fd = -1; + } + return fd; +} + +void ProcessFD::CloseFD() { fd_ = -1; } + +intptr_t ProcessFD::GetFD() const { return fd_; } + +pid_t ProcessFD::GetId() const { return pid_; } + +Process::~Process() {} + +Process::Process() {} + +Process::Process(const Process &) = default; + +Process::Process(Process &&) = default; + +Process &Process::operator=(Process other) { + p_ = std::move(other.p_); + return *this; +} + +Process::Process(pid_t pid) { p_ = std::make_shared(pid); } + +Process::Process(const char *argv[], void *io_service, std::error_code &ec) { + (void)io_service; + ProcessFD procfd = ProcessFD::spawnvp(argv, ec); + if (!ec) { + p_ = std::make_shared(std::move(procfd)); + } +} + +Process Process::CreateNewDummy() { + pid_t pid = -1; + Process result(pid); + return result; +} + +Process Process::FromPid(pid_t pid) { + RAY_DCHECK(pid >= 0); + Process result(pid); + return result; +} + +const void *Process::Get() const { return p_ ? &*p_ : NULL; } + +pid_t Process::GetId() const { return p_ ? p_->GetId() : -1; } + +bool Process::IsNull() const { return !p_; } + +bool Process::IsValid() const { return GetId() != -1; } + +int Process::Wait() const { + int status; + if (p_) { + pid_t pid = p_->GetId(); + if (pid >= 0) { + std::error_code error; + intptr_t fd = p_->GetFD(); +#ifdef _WIN32 + HANDLE handle = fd != -1 ? reinterpret_cast(fd) : NULL; + DWORD exit_code = STILL_ACTIVE; + if (WaitForSingleObject(handle, INFINITE) == WAIT_OBJECT_0 && + GetExitCodeProcess(handle, &exit_code)) { + status = static_cast(exit_code); + } else { + error = std::error_code(GetLastError(), std::system_category()); + status = -1; + } +#else + (void)fd; + if (waitpid(pid, &status, 0) != 0) { + error = std::error_code(errno, std::system_category()); + } +#endif + if (error) { + RAY_LOG(ERROR) << "Failed to wait for process " << pid << " with error " << error + << ": " << error.message(); + } + } else { + // (Dummy process case) + status = 0; + } + } else { + // (Null process case) + status = -1; + } + return status; +} + +void Process::Kill() { + if (p_) { + pid_t pid = p_->GetId(); + if (pid >= 0) { + std::error_code error; + intptr_t fd = p_->GetFD(); +#ifdef _WIN32 + HANDLE handle = fd != -1 ? reinterpret_cast(fd) : NULL; + if (!::TerminateProcess(handle, ERROR_PROCESS_ABORTED)) { + error = std::error_code(GetLastError(), std::system_category()); + } +#else + (void)fd; + if (kill(pid, SIGKILL) != 0) { + error = std::error_code(errno, std::system_category()); + } +#endif + if (error) { + RAY_LOG(ERROR) << "Failed to kill processs " << pid << " with error " << error + << ": " << error.message(); + } + } else { + // (Dummy process case) + // Theoretically we could keep around an exit code here for Wait() to return, + // but we might as well pretend this fake process had already finished running. + // So don't bother doing anything. + } + } else { + // (Null process case) + } +} + +} // namespace ray + +namespace std { + +bool equal_to::operator()(const ray::Process &x, + const ray::Process &y) const { + return !x.IsNull() + ? !y.IsNull() + ? x.IsValid() + ? y.IsValid() ? equal_to()(x.GetId(), y.GetId()) : false + : y.IsValid() ? false + : equal_to()(x.Get(), y.Get()) + : false + : y.IsNull(); +} + +size_t hash::operator()(const ray::Process &value) const { + return !value.IsNull() ? value.IsValid() ? hash()(value.GetId()) + : hash()(value.Get()) + : size_t(); +} + +} // namespace std diff --git a/src/ray/util/process.h b/src/ray/util/process.h new file mode 100644 index 000000000..cc5b96321 --- /dev/null +++ b/src/ray/util/process.h @@ -0,0 +1,81 @@ +#ifndef RAY_UTIL_PROCESS_H +#define RAY_UTIL_PROCESS_H + +#ifdef __linux__ +#include +#include +#include +#endif + +#include +#include +#include +#include + +#ifndef PID_MAX_LIMIT +// This is defined by Linux to be the maximum allowable number of processes +// There's no guarantee for other OSes, but it's useful for testing purposes. +enum { PID_MAX_LIMIT = 1 << 22 }; +#endif + +namespace ray { + +#ifdef _WIN32 +typedef int pid_t; +#endif + +class ProcessFD; + +class Process { + protected: + std::shared_ptr p_; + + explicit Process(pid_t pid); + + public: + ~Process(); + /// Creates a null process object. Two null process objects are assumed equal. + Process(); + Process(const Process &); + Process(Process &&); + Process &operator=(Process other); + /// Creates a new process. + /// \param[in] argv The command-line of the process to spawn (terminated with NULL). + /// \param[in] io_service Boost.Asio I/O service (optional). + /// \param[in] ec Returns any error that occurred when spawning the process. + explicit Process(const char *argv[], void *io_service, std::error_code &ec); + static Process CreateNewDummy(); + static Process FromPid(pid_t pid); + pid_t GetId() const; + /// Returns an opaque pointer or handle to the underlying process object. + /// Implementation detail, used only for identity testing. Do not dereference. + const void *Get() const; + bool IsNull() const; + bool IsValid() const; + /// Forcefully kills the process. Unsafe for unowned processes. + void Kill(); + /// Waits for process to terminate. Not supported for unowned processes. + /// \return The process's exit code. Returns 0 for a dummy process, -1 for a null one. + int Wait() const; +}; + +} // namespace ray + +// We only define operators required by the standard library (==, hash): +// - Valid process objects must be distinguished by their IDs. +// - Invalid process objects must be distinguished by their addresses. +namespace std { + +template <> +struct equal_to { + bool operator()(const ray::Process &x, const ray::Process &y) const; +}; + +template <> +struct hash { + size_t operator()(const ray::Process &value) const; +}; + +} // namespace std + +#endif