diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index a57da6984..7b6ada54c 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -258,6 +258,9 @@ RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20) RAY_CONFIG(int64_t, gcs_service_connect_retries, 50) /// Waiting time for each gcs service connection. RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100) +/// The interval at which the gcs server will check if redis has gone down. +/// When this happens, gcs server will kill itself. +RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100) /// Maximum number of times to retry putting an object when the plasma store is full. /// Can be set to -1 to enable unlimited retries. diff --git a/src/ray/common/test_util.cc b/src/ray/common/test_util.cc index 683827046..436565dbc 100644 --- a/src/ray/common/test_util.cc +++ b/src/ray/common/test_util.cc @@ -41,7 +41,9 @@ void RedisServiceManagerForTest::TearDownTestCase() { std::string stop_redis_command = REDIS_CLIENT_EXEC_PATH + " -p " + std::to_string(REDIS_SERVER_PORT) + " shutdown"; RAY_LOG(INFO) << "Stop redis command is: " << stop_redis_command; - RAY_CHECK(system(stop_redis_command.c_str()) == 0); + if (system(stop_redis_command.c_str()) != 0) { + RAY_LOG(WARNING) << "Failed to stop redis. The redis process may no longer exist."; + } usleep(100 * 1000); } diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 93b91284d..a3cbbdba4 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -36,11 +36,12 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest { config.is_test = true; config.redis_port = REDIS_SERVER_PORT; gcs_server_.reset(new gcs::GcsServer(config)); + io_service_.reset(new boost::asio::io_service()); thread_io_service_.reset(new std::thread([this] { std::unique_ptr work( - new boost::asio::io_service::work(io_service_)); - io_service_.run(); + new boost::asio::io_service::work(*io_service_)); + io_service_->run(); })); thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); })); @@ -54,12 +55,12 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest { gcs::GcsClientOptions options(config.redis_address, config.redis_port, config.redis_password, config.is_test); gcs_client_.reset(new gcs::ServiceBasedGcsClient(options)); - RAY_CHECK_OK(gcs_client_->Connect(io_service_)); + RAY_CHECK_OK(gcs_client_->Connect(*io_service_)); } void TearDown() override { gcs_server_->Stop(); - io_service_.stop(); + io_service_->stop(); thread_io_service_->join(); thread_gcs_server_->join(); gcs_client_->Disconnect(); @@ -340,7 +341,7 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest { std::unique_ptr gcs_server_; std::unique_ptr thread_io_service_; std::unique_ptr thread_gcs_server_; - boost::asio::io_service io_service_; + std::unique_ptr io_service_; // Gcs client std::unique_ptr gcs_client_; @@ -646,6 +647,14 @@ TEST_F(ServiceBasedGcsGcsClientTest, TestDetectGcsAvailability) { promise.get_future().get(); } +TEST_F(ServiceBasedGcsGcsClientTest, TestGcsRedisFailureDetector) { + // Stop redis. + TearDownTestCase(); + + // Check if gcs server has exited. + RAY_CHECK(gcs_server_->IsStopped()); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc new file mode 100644 index 000000000..adf148b35 --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.cc @@ -0,0 +1,64 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "gcs_redis_failure_detector.h" +#include "ray/common/ray_config.h" + +namespace ray { +namespace gcs { + +GcsRedisFailureDetector::GcsRedisFailureDetector( + boost::asio::io_service &io_service, std::shared_ptr redis_context, + std::function callback) + : redis_context_(redis_context), + detect_timer_(io_service), + callback_(std::move(callback)) {} + +void GcsRedisFailureDetector::Start() { + RAY_LOG(INFO) << "Starting redis failure detector."; + Tick(); +} + +void GcsRedisFailureDetector::DetectRedis() { + auto *reply = reinterpret_cast( + redisCommand(redis_context_->sync_context(), "PING")); + if (reply == nullptr || reply->type == REDIS_REPLY_NIL) { + RAY_LOG(ERROR) << "Redis is inactive."; + callback_(); + } else { + freeReplyObject(reply); + } +} + +/// A periodic timer that checks for timed out clients. +void GcsRedisFailureDetector::Tick() { + DetectRedis(); + ScheduleTick(); +} + +void GcsRedisFailureDetector::ScheduleTick() { + auto detect_period = boost::posix_time::milliseconds( + RayConfig::instance().gcs_redis_heartbeat_interval_milliseconds()); + detect_timer_.expires_from_now(detect_period); + detect_timer_.async_wait([this](const boost::system::error_code &error) { + if (error == boost::system::errc::operation_canceled) { + return; + } + RAY_CHECK(!error) << "Detecting redis failed with error: " << error.message(); + Tick(); + }); +} + +} // namespace gcs +} // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h new file mode 100644 index 000000000..f3d42534c --- /dev/null +++ b/src/ray/gcs/gcs_server/gcs_redis_failure_detector.h @@ -0,0 +1,71 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RAY_GCS_REDIS_FAILURE_DETECTOR_H +#define RAY_GCS_REDIS_FAILURE_DETECTOR_H + +#include +#include "ray/gcs/redis_context.h" + +namespace ray { + +namespace gcs { +class RedisGcsClient; + +/// GcsRedisFailureDetector is responsible for monitoring redis and binding GCS server and +/// redis life cycle together. GCS client subscribes to redis messages and it cannot sense +/// whether the redis is inactive unless we go to ping redis voluntarily. But there are +/// many GCS clients, if they all Ping redis, the redis load will be high. So we ping +/// redis on GCS server and GCS client can sense whether redis is normal through RPC +/// connection with GCS server. +class GcsRedisFailureDetector { + public: + /// Create a GcsRedisFailureDetector. + /// + /// \param io_service The event loop to run the monitor on. + /// \param redis_context The redis context is used to ping redis. + /// \param callback Callback that will be called when redis is detected as not alive. + explicit GcsRedisFailureDetector(boost::asio::io_service &io_service, + std::shared_ptr redis_context, + std::function callback); + + /// Start detecting redis. + void Start(); + + protected: + /// A periodic timer that fires on every gcs detect period. + void Tick(); + + /// Schedule another tick after a short time. + void ScheduleTick(); + + /// Check that if redis is inactive. + void DetectRedis(); + + private: + /// A redis context is used to ping redis. + /// TODO(ffbin): We will use redis client later. + std::shared_ptr redis_context_; + + /// A timer that ticks every gcs_detect_timeout_milliseconds. + boost::asio::deadline_timer detect_timer_; + + /// A function is called when redis is detected to be unavailable. + std::function callback_; +}; + +} // namespace gcs +} // namespace ray + +#endif // RAY_GCS_REDIS_FAILURE_DETECTOR_H diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index aad7e05eb..499bba6d2 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -40,6 +40,11 @@ void GcsServer::Start() { // Init gcs node_manager InitGcsNodeManager(); + // Init gcs detector + gcs_redis_failure_detector_ = std::make_shared( + main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); }); + gcs_redis_failure_detector_->Start(); + // Register rpc service. job_info_handler_ = InitJobInfoHandler(); job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *job_info_handler_)); @@ -94,11 +99,15 @@ void GcsServer::Start() { } void GcsServer::Stop() { + RAY_LOG(INFO) << "Stopping gcs server."; // Shutdown the rpc server rpc_server_.Shutdown(); // Stop the event loop. main_service_.stop(); + + is_stopped_ = true; + RAY_LOG(INFO) << "Finished stopping gcs server."; } void GcsServer::InitBackendClient() { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 9a8ea226d..b36725325 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -17,6 +17,7 @@ #include #include +#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h" namespace ray { namespace gcs { @@ -54,9 +55,12 @@ class GcsServer { /// Get the port of this gcs server. int GetPort() const { return rpc_server_.GetPort(); } - /// Check if gcs server is started + /// Check if gcs server is started. bool IsStarted() const { return is_started_; } + /// Check if gcs server is stopped. + bool IsStopped() const { return is_stopped_; } + protected: /// Initialize the backend storage client. /// The gcs server is just the proxy between the gcs client and reliable storage @@ -108,6 +112,8 @@ class GcsServer { boost::asio::io_context main_service_; /// The gcs node manager. std::shared_ptr gcs_node_manager_; + /// The gcs redis failure detector. + std::shared_ptr gcs_redis_failure_detector_; /// Job info handler and service std::unique_ptr job_info_handler_; std::unique_ptr job_info_service_; @@ -134,8 +140,9 @@ class GcsServer { std::unique_ptr worker_info_service_; /// Backend client std::shared_ptr redis_gcs_client_; - /// Gcs service init flag + /// Gcs service state flag, which is used for ut. bool is_started_ = false; + bool is_stopped_ = false; }; } // namespace gcs