[Core] Allow to pass node ip address to gcs server. (#10946)

* Allow to pass node ip address to gcs server.

* Fix.

* Addressed code review.

* Fixed an error.

* Addressed code review.
This commit is contained in:
SangBin Cho
2020-09-23 01:52:26 -07:00
committed by GitHub
parent ba5a3ae9e2
commit 390107b6cb
5 changed files with 17 additions and 7 deletions
+2 -1
View File
@@ -680,7 +680,8 @@ class Node:
config=self._config,
fate_share=self.kernel_fate_share,
gcs_server_port=self._ray_params.gcs_server_port,
metrics_agent_port=self._ray_params.metrics_agent_port)
metrics_agent_port=self._ray_params.metrics_agent_port,
node_ip_address=self._node_ip_address)
assert (
ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
+4 -1
View File
@@ -1097,7 +1097,8 @@ def start_gcs_server(redis_address,
config=None,
fate_share=None,
gcs_server_port=None,
metrics_agent_port=None):
metrics_agent_port=None,
node_ip_address=None):
"""Start a gcs server.
Args:
redis_address (str): The address that the Redis server is listening on.
@@ -1110,6 +1111,7 @@ def start_gcs_server(redis_address,
override defaults in RayConfig.
gcs_server_port (int): Port number of the gcs server.
metrics_agent_port(int): The port where metrics agent is bound to.
node_ip_address(str): IP Address of a node where gcs server starts.
Returns:
ProcessInfo for the process that was started.
"""
@@ -1126,6 +1128,7 @@ def start_gcs_server(redis_address,
f"--config_list={config_str}",
f"--gcs_server_port={gcs_server_port}",
f"--metrics-agent-port={metrics_agent_port}",
f"--node-ip-address={node_ip_address}",
]
if redis_password:
command += [f"--redis_password={redis_password}"]
+7 -5
View File
@@ -252,11 +252,13 @@ std::unique_ptr<GcsObjectManager> GcsServer::InitObjectManager() {
}
void GcsServer::StoreGcsServerAddressInRedis() {
std::string address =
GetValidLocalIp(
GetPort(),
RayConfig::instance().internal_gcs_service_connect_wait_milliseconds()) +
":" + std::to_string(GetPort());
std::string ip = config_.node_ip_address;
if (ip.empty()) {
ip = GetValidLocalIp(
GetPort(),
RayConfig::instance().internal_gcs_service_connect_wait_milliseconds());
}
std::string address = ip + ":" + std::to_string(GetPort());
RAY_LOG(INFO) << "Gcs server address = " << address;
RAY_CHECK_OK(redis_gcs_client_->primary_context()->RunArgvAsync(
+1
View File
@@ -34,6 +34,7 @@ struct GcsServerConfig {
uint16_t redis_port = 6379;
bool retry_redis = true;
bool is_test = false;
std::string node_ip_address;
};
class GcsNodeManager;
@@ -27,6 +27,7 @@ DEFINE_int32(metrics_agent_port, -1, "The port of metrics agent.");
DEFINE_string(config_list, "", "The config list of raylet.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_bool(retry_redis, false, "Whether we retry to connect to the redis.");
DEFINE_string(node_ip_address, "", "The ip address of the node.");
int main(int argc, char *argv[]) {
InitShutdownRAII ray_log_shutdown_raii(ray::RayLog::StartRayLog,
@@ -42,6 +43,7 @@ int main(int argc, char *argv[]) {
const std::string config_list = FLAGS_config_list;
const std::string redis_password = FLAGS_redis_password;
const bool retry_redis = FLAGS_retry_redis;
const std::string node_ip_address = FLAGS_node_ip_address;
gflags::ShutDownCommandLineFlags();
std::unordered_map<std::string, std::string> config_map;
@@ -75,6 +77,7 @@ int main(int argc, char *argv[]) {
gcs_server_config.redis_port = redis_port;
gcs_server_config.redis_password = redis_password;
gcs_server_config.retry_redis = retry_redis;
gcs_server_config.node_ip_address = node_ip_address;
ray::gcs::GcsServer gcs_server(gcs_server_config, main_service);
// Destroy the GCS server on a SIGTERM. The pointer to main_service is