Extract the connection logic to reduce duplication. (#12016)

This commit is contained in:
dHannasch
2020-11-18 01:12:58 -07:00
committed by GitHub
parent d23d326560
commit b41f4fdec2
+30 -10
View File
@@ -284,21 +284,40 @@ void SetDisconnectCallback(RedisAsyncContext *redis_async_context) {
RedisAsyncContextDisconnectCallback);
}
template <typename RedisContext, typename RedisConnectFunction>
Status ConnectWithoutRetries(const std::string &address, int port,
const RedisConnectFunction &connect_function,
RedisContext **context, std::string &errorMessage) {
// This currently returns the errorMessage in two different ways,
// as an output parameter and in the Status::RedisError,
// because we're not sure whether we'll want to change what this returns.
*context = connect_function(address.c_str(), port);
if (*context == nullptr || (*context)->err) {
std::ostringstream oss(errorMessage);
if (*context == nullptr) {
oss << "Could not allocate Redis context.";
} else if ((*context)->err) {
oss << "Could not establish connection to Redis " << address << ":" << port
<< " (context.err = " << (*context)->err << ")";
}
return Status::RedisError(errorMessage);
}
return Status::OK();
}
template <typename RedisContext, typename RedisConnectFunction>
Status ConnectWithRetries(const std::string &address, int port,
const RedisConnectFunction &connect_function,
RedisContext **context) {
int connection_attempts = 0;
*context = connect_function(address.c_str(), port);
while (*context == nullptr || (*context)->err) {
std::string errorMessage = "";
Status status =
ConnectWithoutRetries(address, port, connect_function, context, errorMessage);
while (!status.ok()) {
if (connection_attempts >= RayConfig::instance().redis_db_connect_retries()) {
if (*context == nullptr) {
RAY_LOG(FATAL) << "Could not allocate redis context.";
}
if ((*context)->err) {
RAY_LOG(FATAL) << "Could not establish connection to redis " << address << ":"
<< port << " (context.err = " << (*context)->err << ")";
}
RAY_LOG(FATAL) << RayConfig::instance().redis_db_connect_retries() << " attempts "
<< "to connect have all failed. The last error message was: "
<< errorMessage;
break;
}
if (*context == nullptr) {
@@ -316,7 +335,8 @@ Status ConnectWithRetries(const std::string &address, int port,
// Sleep for a little.
std::this_thread::sleep_for(std::chrono::milliseconds(
RayConfig::instance().redis_db_connect_wait_milliseconds()));
*context = connect_function(address.c_str(), port);
status =
ConnectWithoutRetries(address, port, connect_function, context, errorMessage);
connection_attempts += 1;
}
return Status::OK();