From c11855728a306d3df970894211e15e509ddfdfb8 Mon Sep 17 00:00:00 2001 From: ChenZhilei <2522134184@qq.com> Date: Wed, 1 Jul 2020 20:01:52 +0800 Subject: [PATCH] Remove raylet monitor after use GCS service (#9179) --- BUILD.bazel | 19 +-- python/ray/ray_constants.py | 1 - python/ray/scripts/scripts.py | 1 - python/ray/services.py | 44 ------- python/setup.py | 1 - src/ray/common/test_util.cc | 2 - src/ray/common/test_util.h | 2 - src/ray/core_worker/test/core_worker_test.cc | 13 +- src/ray/raylet/monitor.cc | 121 ------------------- src/ray/raylet/monitor.h | 75 ------------ src/ray/raylet/monitor_main.cc | 80 ------------ 11 files changed, 7 insertions(+), 352 deletions(-) delete mode 100644 src/ray/raylet/monitor.cc delete mode 100644 src/ray/raylet/monitor.h delete mode 100644 src/ray/raylet/monitor_main.cc diff --git a/BUILD.bazel b/BUILD.bazel index 6a63cf12a..69cb2ffbf 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -481,20 +481,6 @@ cc_binary( ], ) -cc_binary( - name = "raylet_monitor", - srcs = [ - "src/ray/raylet/monitor.cc", - "src/ray/raylet/monitor.h", - "src/ray/raylet/monitor_main.cc", - ], - copts = COPTS, - deps = [ - ":gcs", - ":ray_util", - ], -) - cc_library( name = "gcs_pub_sub_lib", srcs = glob( @@ -723,7 +709,7 @@ cc_binary( cc_test( name = "core_worker_test", srcs = ["src/ray/core_worker/test/core_worker_test.cc"], - args = ["$(location //:plasma_store_server) $(location raylet) $(location raylet_monitor) $(location mock_worker) $(location gcs_server) $(location redis-cli) $(location redis-server) $(location libray_redis_module.so)"], + args = ["$(location //:plasma_store_server) $(location raylet) $(location mock_worker) $(location gcs_server) $(location redis-cli) $(location redis-server) $(location libray_redis_module.so)"], copts = COPTS, data = [ "//:gcs_server", @@ -731,7 +717,6 @@ cc_test( "//:mock_worker", "//:plasma_store_server", "//:raylet", - "//:raylet_monitor", "//:redis-cli", "//:redis-server", ], @@ -1831,7 +1816,6 @@ genrule( "//:redis-cli", "//:libray_redis_module.so", "//:raylet", - "//:raylet_monitor", "//:gcs_server", "//:plasma_store_server", "//streaming:copy_streaming_py_proto", @@ -1857,7 +1841,6 @@ genrule( cp -f $(location //:redis-cli) "$$WORK_DIR/python/ray/core/src/ray/thirdparty/redis/src/" && mkdir -p "$$WORK_DIR/python/ray/core/src/ray/gcs/redis_module/" && cp -f $(locations //:libray_redis_module.so) "$$WORK_DIR/python/ray/core/src/ray/gcs/redis_module/" && - cp -f $(location //:raylet_monitor) "$$WORK_DIR/python/ray/core/src/ray/raylet/" && cp -f $(location //:plasma_store_server) "$$WORK_DIR/python/ray/core/src/plasma/" && cp -f $(location //:raylet) "$$WORK_DIR/python/ray/core/src/ray/raylet/" && cp -f $(location //:gcs_server) "$$WORK_DIR/python/ray/core/src/ray/gcs/" && diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index e8d45be1b..93a124923 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -166,7 +166,6 @@ LOGGER_LEVEL_HELP = ("The logging level threshold, choices=['debug', 'info'," # Constants used to define the different process types. PROCESS_TYPE_REAPER = "reaper" PROCESS_TYPE_MONITOR = "monitor" -PROCESS_TYPE_RAYLET_MONITOR = "raylet_monitor" PROCESS_TYPE_LOG_MONITOR = "log_monitor" PROCESS_TYPE_REPORTER = "reporter" PROCESS_TYPE_DASHBOARD = "dashboard" diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 2598412bd..2ca66c602 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -570,7 +570,6 @@ def stop(force, verbose): # Keyword to filter, filter by command (True)/filter by args (False) ["raylet", True], ["plasma_store", True], - ["raylet_monitor", True], ["gcs_server", True], ["monitor.py", False], ["redis-server", False], diff --git a/python/ray/services.py b/python/ray/services.py index 8515325a9..4cb844974 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -54,9 +54,6 @@ PLASMA_STORE_EXECUTABLE = os.path.join( "core/src/plasma/plasma_store_server") # Location of the raylet executables. -RAYLET_MONITOR_EXECUTABLE = os.path.join( - os.path.abspath(os.path.dirname(__file__)), - "core/src/ray/raylet/raylet_monitor") RAYLET_EXECUTABLE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/ray/raylet/raylet") GCS_SERVER_EXECUTABLE = os.path.join( @@ -1737,44 +1734,3 @@ def start_monitor(redis_address, stderr_file=stderr_file, fate_share=fate_share) return process_info - - -def start_raylet_monitor(redis_address, - stdout_file=None, - stderr_file=None, - redis_password=None, - config=None, - fate_share=None): - """Run a process to monitor the other processes. - - Args: - redis_address (str): The address that the Redis server is listening on. - stdout_file: A file handle opened for writing to redirect stdout to. If - no redirection should happen, then this should be None. - stderr_file: A file handle opened for writing to redirect stderr to. If - no redirection should happen, then this should be None. - redis_password (str): The password of the redis server. - config (dict|None): Optional configuration that will - override defaults in RayConfig. - - Returns: - ProcessInfo for the process that was started. - """ - gcs_ip_address, gcs_port = redis_address.split(":") - redis_password = redis_password or "" - config_str = ",".join(["{},{}".format(*kv) for kv in config.items()]) - command = [ - RAYLET_MONITOR_EXECUTABLE, - "--redis_address={}".format(gcs_ip_address), - "--redis_port={}".format(gcs_port), - "--config_list={}".format(config_str), - ] - if redis_password: - command += ["--redis_password={}".format(redis_password)] - process_info = start_ray_process( - command, - ray_constants.PROCESS_TYPE_RAYLET_MONITOR, - stdout_file=stdout_file, - stderr_file=stderr_file, - fate_share=fate_share) - return process_info diff --git a/python/setup.py b/python/setup.py index 3122ae38c..0427306a4 100644 --- a/python/setup.py +++ b/python/setup.py @@ -26,7 +26,6 @@ ray_files = [ "ray/core/src/ray/gcs/redis_module/libray_redis_module.so", "ray/core/src/plasma/plasma_store_server" + exe_suffix, "ray/_raylet" + pyd_suffix, - "ray/core/src/ray/raylet/raylet_monitor" + exe_suffix, "ray/core/src/ray/gcs/gcs_server" + exe_suffix, "ray/core/src/ray/raylet/raylet" + exe_suffix, "ray/streaming/_streaming.so", diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index ab6b97e3f..c1824a7c6 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -239,7 +239,5 @@ std::string TEST_GCS_SERVER_EXEC_PATH; std::string TEST_RAYLET_EXEC_PATH; /// Path to mock worker executable binary. Required by raylet. std::string TEST_MOCK_WORKER_EXEC_PATH; -/// Path to raylet monitor executable binary. -std::string TEST_RAYLET_MONITOR_EXEC_PATH; } // namespace ray diff --git a/src/ray/common/test_util.h b/src/ray/common/test_util.h index 4fb2e9332..3516bb337 100644 --- a/src/ray/common/test_util.h +++ b/src/ray/common/test_util.h @@ -75,8 +75,6 @@ extern std::string TEST_GCS_SERVER_EXEC_PATH; extern std::string TEST_RAYLET_EXEC_PATH; /// Path to mock worker executable binary. Required by raylet. extern std::string TEST_MOCK_WORKER_EXEC_PATH; -/// Path to raylet monitor executable binary. -extern std::string TEST_RAYLET_MONITOR_EXEC_PATH; //-------------------------------------------------------------------------------- // COMPONENT MANAGEMENT CLASSES FOR TEST CASES diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 193cf2899..dbdade761 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -895,7 +895,7 @@ TEST_F(TwoNodeTest, TestActorTaskCrossNodesFailure) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); - RAY_CHECK(argc == 9); + RAY_CHECK(argc == 8); ray::TEST_STORE_EXEC_PATH = std::string(argv[1]); ray::TEST_RAYLET_EXEC_PATH = std::string(argv[2]); @@ -904,12 +904,11 @@ int main(int argc, char **argv) { std::uniform_int_distribution random_gen{2000, 2009}; // Use random port to avoid port conflicts between UTs. node_manager_port = random_gen(gen); - ray::TEST_RAYLET_MONITOR_EXEC_PATH = std::string(argv[3]); - ray::TEST_MOCK_WORKER_EXEC_PATH = std::string(argv[4]); - ray::TEST_GCS_SERVER_EXEC_PATH = std::string(argv[5]); + ray::TEST_MOCK_WORKER_EXEC_PATH = std::string(argv[3]); + ray::TEST_GCS_SERVER_EXEC_PATH = std::string(argv[4]); - ray::TEST_REDIS_CLIENT_EXEC_PATH = std::string(argv[6]); - ray::TEST_REDIS_SERVER_EXEC_PATH = std::string(argv[7]); - ray::TEST_REDIS_MODULE_LIBRARY_PATH = std::string(argv[8]); + ray::TEST_REDIS_CLIENT_EXEC_PATH = std::string(argv[5]); + ray::TEST_REDIS_SERVER_EXEC_PATH = std::string(argv[6]); + ray::TEST_REDIS_MODULE_LIBRARY_PATH = std::string(argv[7]); return RUN_ALL_TESTS(); } diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc deleted file mode 100644 index 9a931deea..000000000 --- a/src/ray/raylet/monitor.cc +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/raylet/monitor.h" - -#include "ray/common/ray_config.h" -#include "ray/common/status.h" -#include "ray/gcs/pb_util.h" -#include "ray/util/util.h" - -namespace ray { - -namespace raylet { - -/// \class Monitor -/// -/// The monitor is responsible for listening for heartbeats from Raylets and -/// deciding when a Raylet has died. If the monitor does not hear from a Raylet -/// within heartbeat_timeout_milliseconds * num_heartbeats_timeout (defined in -/// the Ray configuration), then the monitor will mark that Raylet as dead in -/// the client table, which broadcasts the event to all other Raylets. -Monitor::Monitor(boost::asio::io_service &io_service, - const gcs::GcsClientOptions &gcs_client_options) - : gcs_client_(new gcs::RedisGcsClient(gcs_client_options)), - num_heartbeats_timeout_(RayConfig::instance().num_heartbeats_timeout()), - heartbeat_timer_(io_service) { - RAY_CHECK_OK(gcs_client_->Connect(io_service)); -} - -void Monitor::HandleHeartbeat(const ClientID &node_id, - const HeartbeatTableData &heartbeat_data) { - heartbeats_[node_id] = num_heartbeats_timeout_; - heartbeat_buffer_[node_id] = heartbeat_data; -} - -void Monitor::Start() { - const auto heartbeat_callback = [this](const ClientID &id, - const HeartbeatTableData &heartbeat_data) { - HandleHeartbeat(id, heartbeat_data); - }; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncSubscribeHeartbeat(heartbeat_callback, nullptr)); - Tick(); -} - -/// A periodic timer that checks for timed out clients. -void Monitor::Tick() { - for (auto it = heartbeats_.begin(); it != heartbeats_.end();) { - it->second--; - if (it->second == 0) { - if (dead_nodes_.count(it->first) == 0) { - auto node_id = it->first; - RAY_LOG(WARNING) << "Node timed out: " << node_id; - auto lookup_callback = [this, node_id](Status status, - const std::vector &all_node) { - RAY_CHECK(status.ok()) << status.CodeAsString(); - bool marked = false; - for (const auto &node : all_node) { - if (node_id.Binary() == node.node_id() && node.state() == GcsNodeInfo::DEAD) { - // The node has been marked dead by itself. - marked = true; - } - } - if (!marked) { - RAY_CHECK_OK( - gcs_client_->Nodes().AsyncUnregister(node_id, /* callback */ nullptr)); - // Broadcast a warning to all of the drivers indicating that the node - // has been marked as dead. - // TODO(rkn): Define this constant somewhere else. - std::string type = "node_removed"; - std::ostringstream error_message; - error_message << "The node with client ID " << node_id - << " has been marked dead because the monitor" - << " has missed too many heartbeats from it."; - auto error_data_ptr = - gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); - RAY_CHECK_OK( - gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); - } - }; - RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(lookup_callback)); - dead_nodes_.insert(node_id); - } - it = heartbeats_.erase(it); - } else { - it++; - } - } - - // Send any buffered heartbeats as a single publish. - if (!heartbeat_buffer_.empty()) { - auto batch = std::make_shared(); - for (const auto &heartbeat : heartbeat_buffer_) { - batch->add_batch()->CopyFrom(heartbeat.second); - } - RAY_CHECK_OK(gcs_client_->Nodes().AsyncReportBatchHeartbeat(batch, nullptr)); - heartbeat_buffer_.clear(); - } - - auto heartbeat_period = boost::posix_time::milliseconds( - RayConfig::instance().raylet_heartbeat_timeout_milliseconds()); - heartbeat_timer_.expires_from_now(heartbeat_period); - heartbeat_timer_.async_wait([this](const boost::system::error_code &error) { - RAY_CHECK(!error); - Tick(); - }); -} - -} // namespace raylet - -} // namespace ray diff --git a/src/ray/raylet/monitor.h b/src/ray/raylet/monitor.h deleted file mode 100644 index 17a4006f9..000000000 --- a/src/ray/raylet/monitor.h +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include - -#include "ray/common/id.h" -#include "ray/gcs/redis_gcs_client.h" - -namespace ray { - -namespace raylet { - -using rpc::GcsNodeInfo; -using rpc::HeartbeatBatchTableData; -using rpc::HeartbeatTableData; - -class Monitor { - public: - /// Create a Raylet monitor attached to the given GCS address and port. - /// - /// \param io_service The event loop to run the monitor on. - /// \param redis_address The GCS Redis address to connect to. - /// \param redis_port The GCS Redis port to connect to. - Monitor(boost::asio::io_service &io_service, - const gcs::GcsClientOptions &gcs_client_options); - - /// Start the monitor. Listen for heartbeats from Raylets and mark Raylets - /// that do not send a heartbeat within a given period as dead. - void Start(); - - /// A periodic timer that fires on every heartbeat period. Raylets that have - /// not sent a heartbeat within the last num_heartbeats_timeout ticks will be - /// marked as dead in the client table. - void Tick(); - - /// Handle a heartbeat from a Raylet. - /// - /// \param client_id The client ID of the Raylet that sent the heartbeat. - /// \param heartbeat_data The heartbeat sent by the client. - void HandleHeartbeat(const ClientID &client_id, - const HeartbeatTableData &heartbeat_data); - - private: - /// A client to the GCS, through which heartbeats are received. - std::unique_ptr gcs_client_; - /// The number of heartbeats that can be missed before a client is removed. - int64_t num_heartbeats_timeout_; - /// A timer that ticks every heartbeat_timeout_ms_ milliseconds. - boost::asio::deadline_timer heartbeat_timer_; - /// For each Raylet that we receive a heartbeat from, the number of ticks - /// that may pass before the Raylet will be declared dead. - std::unordered_map heartbeats_; - /// The Raylets that have been marked as dead in gcs. - std::unordered_set dead_nodes_; - /// A buffer containing heartbeats received from node managers in the last tick. - std::unordered_map heartbeat_buffer_; -}; - -} // namespace raylet - -} // namespace ray diff --git a/src/ray/raylet/monitor_main.cc b/src/ray/raylet/monitor_main.cc deleted file mode 100644 index 308db3dcd..000000000 --- a/src/ray/raylet/monitor_main.cc +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include "ray/common/ray_config.h" -#include "ray/raylet/monitor.h" -#include "ray/util/util.h" - -#include "gflags/gflags.h" - -DEFINE_string(redis_address, "", "The ip address of redis."); -DEFINE_int32(redis_port, -1, "The port of redis."); -DEFINE_string(config_list, "", "The config list of raylet."); -DEFINE_string(redis_password, "", "The password of redis."); - -int main(int argc, char *argv[]) { - InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog, - ray::RayLog::ShutDownRayLog, argv[0], - ray::RayLogLevel::INFO, /*log_dir=*/""); - ray::RayLog::InstallFailureSignalHandler(); - - gflags::ParseCommandLineFlags(&argc, &argv, true); - const std::string redis_address = FLAGS_redis_address; - const int redis_port = static_cast(FLAGS_redis_port); - const std::string config_list = FLAGS_config_list; - const std::string redis_password = FLAGS_redis_password; - gflags::ShutDownCommandLineFlags(); - - ray::gcs::GcsClientOptions gcs_client_options(redis_address, redis_port, - redis_password); - - std::unordered_map raylet_config; - - // Parse the configuration list. - std::istringstream config_string(config_list); - std::string config_name; - std::string config_value; - - while (std::getline(config_string, config_name, ',')) { - RAY_CHECK(std::getline(config_string, config_value, ',')); - // TODO(rkn): The line below could throw an exception. What should we do about this? - raylet_config[config_name] = config_value; - } - - RayConfig::instance().initialize(raylet_config); - - boost::asio::io_service io_service; - - // The code below is commented out because it appears to introduce a double - // free error in the raylet monitor. - // // Destroy the Raylet monitor on a SIGTERM. The pointer to io_service is - // // guaranteed to be valid since this function will run the event loop - // // instead of returning immediately. - // auto handler = [&io_service](const boost::system::error_code &error, - // int signal_number) { io_service.stop(); }; - // boost::asio::signal_set signals(io_service); - // #ifdef _WIN32 - // signals.add(SIGBREAK); - // #else - // signals.add(SIGTERM); - // #endif - // signals.async_wait(handler); - - // Initialize the monitor. - ray::raylet::Monitor monitor(io_service, gcs_client_options); - monitor.Start(); - io_service.run(); -}