[GCS]Tie lifecycle of gcs service and redis together (#7601)

This commit is contained in:
fangfengbin
2020-03-19 19:52:35 +08:00
committed by GitHub
parent b499100a88
commit 0d0a41f598
7 changed files with 173 additions and 8 deletions
+3
View File
@@ -258,6 +258,9 @@ RAY_CONFIG(uint32_t, object_store_get_max_ids_to_print_in_warning, 20)
RAY_CONFIG(int64_t, gcs_service_connect_retries, 50)
/// Waiting time for each gcs service connection.
RAY_CONFIG(int64_t, internal_gcs_service_connect_wait_milliseconds, 100)
/// The interval at which the gcs server will check if redis has gone down.
/// When this happens, gcs server will kill itself.
RAY_CONFIG(int64_t, gcs_redis_heartbeat_interval_milliseconds, 100)
/// Maximum number of times to retry putting an object when the plasma store is full.
/// Can be set to -1 to enable unlimited retries.
+3 -1
View File
@@ -41,7 +41,9 @@ void RedisServiceManagerForTest::TearDownTestCase() {
std::string stop_redis_command =
REDIS_CLIENT_EXEC_PATH + " -p " + std::to_string(REDIS_SERVER_PORT) + " shutdown";
RAY_LOG(INFO) << "Stop redis command is: " << stop_redis_command;
RAY_CHECK(system(stop_redis_command.c_str()) == 0);
if (system(stop_redis_command.c_str()) != 0) {
RAY_LOG(WARNING) << "Failed to stop redis. The redis process may no longer exist.";
}
usleep(100 * 1000);
}
@@ -36,11 +36,12 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
config.is_test = true;
config.redis_port = REDIS_SERVER_PORT;
gcs_server_.reset(new gcs::GcsServer(config));
io_service_.reset(new boost::asio::io_service());
thread_io_service_.reset(new std::thread([this] {
std::unique_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(io_service_));
io_service_.run();
new boost::asio::io_service::work(*io_service_));
io_service_->run();
}));
thread_gcs_server_.reset(new std::thread([this] { gcs_server_->Start(); }));
@@ -54,12 +55,12 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
gcs::GcsClientOptions options(config.redis_address, config.redis_port,
config.redis_password, config.is_test);
gcs_client_.reset(new gcs::ServiceBasedGcsClient(options));
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
RAY_CHECK_OK(gcs_client_->Connect(*io_service_));
}
void TearDown() override {
gcs_server_->Stop();
io_service_.stop();
io_service_->stop();
thread_io_service_->join();
thread_gcs_server_->join();
gcs_client_->Disconnect();
@@ -340,7 +341,7 @@ class ServiceBasedGcsGcsClientTest : public RedisServiceManagerForTest {
std::unique_ptr<gcs::GcsServer> gcs_server_;
std::unique_ptr<std::thread> thread_io_service_;
std::unique_ptr<std::thread> thread_gcs_server_;
boost::asio::io_service io_service_;
std::unique_ptr<boost::asio::io_service> io_service_;
// Gcs client
std::unique_ptr<gcs::GcsClient> gcs_client_;
@@ -646,6 +647,14 @@ TEST_F(ServiceBasedGcsGcsClientTest, TestDetectGcsAvailability) {
promise.get_future().get();
}
TEST_F(ServiceBasedGcsGcsClientTest, TestGcsRedisFailureDetector) {
// Stop redis.
TearDownTestCase();
// Check if gcs server has exited.
RAY_CHECK(gcs_server_->IsStopped());
}
} // namespace ray
int main(int argc, char **argv) {
@@ -0,0 +1,64 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "gcs_redis_failure_detector.h"
#include "ray/common/ray_config.h"
namespace ray {
namespace gcs {
GcsRedisFailureDetector::GcsRedisFailureDetector(
boost::asio::io_service &io_service, std::shared_ptr<RedisContext> redis_context,
std::function<void()> callback)
: redis_context_(redis_context),
detect_timer_(io_service),
callback_(std::move(callback)) {}
void GcsRedisFailureDetector::Start() {
RAY_LOG(INFO) << "Starting redis failure detector.";
Tick();
}
void GcsRedisFailureDetector::DetectRedis() {
auto *reply = reinterpret_cast<redisReply *>(
redisCommand(redis_context_->sync_context(), "PING"));
if (reply == nullptr || reply->type == REDIS_REPLY_NIL) {
RAY_LOG(ERROR) << "Redis is inactive.";
callback_();
} else {
freeReplyObject(reply);
}
}
/// A periodic timer that checks for timed out clients.
void GcsRedisFailureDetector::Tick() {
DetectRedis();
ScheduleTick();
}
void GcsRedisFailureDetector::ScheduleTick() {
auto detect_period = boost::posix_time::milliseconds(
RayConfig::instance().gcs_redis_heartbeat_interval_milliseconds());
detect_timer_.expires_from_now(detect_period);
detect_timer_.async_wait([this](const boost::system::error_code &error) {
if (error == boost::system::errc::operation_canceled) {
return;
}
RAY_CHECK(!error) << "Detecting redis failed with error: " << error.message();
Tick();
});
}
} // namespace gcs
} // namespace ray
@@ -0,0 +1,71 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RAY_GCS_REDIS_FAILURE_DETECTOR_H
#define RAY_GCS_REDIS_FAILURE_DETECTOR_H
#include <boost/asio.hpp>
#include "ray/gcs/redis_context.h"
namespace ray {
namespace gcs {
class RedisGcsClient;
/// GcsRedisFailureDetector is responsible for monitoring redis and binding GCS server and
/// redis life cycle together. GCS client subscribes to redis messages and it cannot sense
/// whether the redis is inactive unless we go to ping redis voluntarily. But there are
/// many GCS clients, if they all Ping redis, the redis load will be high. So we ping
/// redis on GCS server and GCS client can sense whether redis is normal through RPC
/// connection with GCS server.
class GcsRedisFailureDetector {
public:
/// Create a GcsRedisFailureDetector.
///
/// \param io_service The event loop to run the monitor on.
/// \param redis_context The redis context is used to ping redis.
/// \param callback Callback that will be called when redis is detected as not alive.
explicit GcsRedisFailureDetector(boost::asio::io_service &io_service,
std::shared_ptr<RedisContext> redis_context,
std::function<void()> callback);
/// Start detecting redis.
void Start();
protected:
/// A periodic timer that fires on every gcs detect period.
void Tick();
/// Schedule another tick after a short time.
void ScheduleTick();
/// Check that if redis is inactive.
void DetectRedis();
private:
/// A redis context is used to ping redis.
/// TODO(ffbin): We will use redis client later.
std::shared_ptr<RedisContext> redis_context_;
/// A timer that ticks every gcs_detect_timeout_milliseconds.
boost::asio::deadline_timer detect_timer_;
/// A function is called when redis is detected to be unavailable.
std::function<void()> callback_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_FAILURE_DETECTOR_H
+9
View File
@@ -40,6 +40,11 @@ void GcsServer::Start() {
// Init gcs node_manager
InitGcsNodeManager();
// Init gcs detector
gcs_redis_failure_detector_ = std::make_shared<GcsRedisFailureDetector>(
main_service_, redis_gcs_client_->primary_context(), [this]() { Stop(); });
gcs_redis_failure_detector_->Start();
// Register rpc service.
job_info_handler_ = InitJobInfoHandler();
job_info_service_.reset(new rpc::JobInfoGrpcService(main_service_, *job_info_handler_));
@@ -94,11 +99,15 @@ void GcsServer::Start() {
}
void GcsServer::Stop() {
RAY_LOG(INFO) << "Stopping gcs server.";
// Shutdown the rpc server
rpc_server_.Shutdown();
// Stop the event loop.
main_service_.stop();
is_stopped_ = true;
RAY_LOG(INFO) << "Finished stopping gcs server.";
}
void GcsServer::InitBackendClient() {
+9 -2
View File
@@ -17,6 +17,7 @@
#include <ray/gcs/redis_gcs_client.h>
#include <ray/rpc/gcs_server/gcs_rpc_server.h>
#include "ray/gcs/gcs_server/gcs_redis_failure_detector.h"
namespace ray {
namespace gcs {
@@ -54,9 +55,12 @@ class GcsServer {
/// Get the port of this gcs server.
int GetPort() const { return rpc_server_.GetPort(); }
/// Check if gcs server is started
/// Check if gcs server is started.
bool IsStarted() const { return is_started_; }
/// Check if gcs server is stopped.
bool IsStopped() const { return is_stopped_; }
protected:
/// Initialize the backend storage client.
/// The gcs server is just the proxy between the gcs client and reliable storage
@@ -108,6 +112,8 @@ class GcsServer {
boost::asio::io_context main_service_;
/// The gcs node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
/// The gcs redis failure detector.
std::shared_ptr<GcsRedisFailureDetector> gcs_redis_failure_detector_;
/// Job info handler and service
std::unique_ptr<rpc::JobInfoHandler> job_info_handler_;
std::unique_ptr<rpc::JobInfoGrpcService> job_info_service_;
@@ -134,8 +140,9 @@ class GcsServer {
std::unique_ptr<rpc::WorkerInfoGrpcService> worker_info_service_;
/// Backend client
std::shared_ptr<RedisGcsClient> redis_gcs_client_;
/// Gcs service init flag
/// Gcs service state flag, which is used for ut.
bool is_started_ = false;
bool is_stopped_ = false;
};
} // namespace gcs