[Core] Support GCS server port assignment. (#8962)

This commit is contained in:
SangBin Cho
2020-07-14 09:49:56 -07:00
committed by GitHub
parent f6eb47fc1f
commit 539c51a003
5 changed files with 31 additions and 10 deletions
+2 -1
View File
@@ -630,7 +630,8 @@ class Node:
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
config=self._config,
fate_share=self.kernel_fate_share)
fate_share=self.kernel_fate_share,
gcs_server_port=self._ray_params.gcs_server_port)
assert (
ray_constants.PROCESS_TYPE_GCS_SERVER not in self.all_processes)
self.all_processes[ray_constants.PROCESS_TYPE_GCS_SERVER] = [
+3
View File
@@ -33,6 +33,7 @@ class RayParams:
sharded redis tables (task and object tables).
object_manager_port int: The port to use for the object manager.
node_manager_port: The port to use for the node manager.
gcs_server_port: The port to use for the GCS server.
node_ip_address (str): The IP address of the node that we are on.
raylet_ip_address (str): The IP address of the raylet that this node
connects to.
@@ -105,6 +106,7 @@ class RayParams:
redis_shard_ports=None,
object_manager_port=None,
node_manager_port=None,
gcs_server_port=None,
node_ip_address=None,
raylet_ip_address=None,
min_worker_port=None,
@@ -147,6 +149,7 @@ class RayParams:
self.redis_shard_ports = redis_shard_ports
self.object_manager_port = object_manager_port
self.node_manager_port = node_manager_port
self.gcs_server_port = gcs_server_port
self.node_ip_address = node_ip_address
self.raylet_ip_address = raylet_ip_address
self.min_worker_port = min_worker_port
+15 -5
View File
@@ -179,6 +179,11 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
required=False,
type=int,
help="the port to use for starting the node manager")
@click.option(
"--gcs-server-port",
required=False,
type=int,
help="Port number for the GCS server.")
@click.option(
"--min-worker-port",
required=False,
@@ -339,15 +344,19 @@ def dashboard(cluster_config_file, cluster_name, port, remote_port):
def start(node_ip_address, redis_address, address, redis_port, port,
num_redis_shards, redis_max_clients, redis_password,
redis_shard_ports, object_manager_port, node_manager_port,
min_worker_port, max_worker_port, memory, object_store_memory,
redis_max_memory, num_cpus, num_gpus, resources, head, include_webui,
webui_host, include_dashboard, dashboard_host, dashboard_port, block,
plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
gcs_server_port, min_worker_port, max_worker_port, memory,
object_store_memory, redis_max_memory, num_cpus, num_gpus, resources,
head, include_webui, webui_host, include_dashboard, dashboard_host,
dashboard_port, block, plasma_directory, huge_pages,
autoscaling_config, no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir, include_java,
java_worker_options, load_code_from_local, internal_config,
lru_evict):
"""Start Ray processes manually on the local machine."""
if gcs_server_port and not head:
raise ValueError(
"gcs_server_port can be only assigned when you specify --head.")
if redis_address is not None:
raise DeprecationWarning("The --redis-address argument is "
"deprecated. Please use --address instead.")
@@ -399,6 +408,7 @@ def start(node_ip_address, redis_address, address, redis_port, port,
max_worker_port=max_worker_port,
object_manager_port=object_manager_port,
node_manager_port=node_manager_port,
gcs_server_port=gcs_server_port,
memory=memory,
object_store_memory=object_store_memory,
redis_password=redis_password,
+7 -1
View File
@@ -1202,7 +1202,8 @@ def start_gcs_server(redis_address,
stderr_file=None,
redis_password=None,
config=None,
fate_share=None):
fate_share=None,
gcs_server_port=None):
"""Start a gcs server.
Args:
redis_address (str): The address that the Redis server is listening on.
@@ -1213,17 +1214,22 @@ def start_gcs_server(redis_address,
redis_password (str): The password of the redis server.
config (dict|None): Optional configuration that will
override defaults in RayConfig.
gcs_server_port (int): Port number of the gcs server.
Returns:
ProcessInfo for the process that was started.
"""
gcs_ip_address, gcs_port = redis_address.split(":")
redis_password = redis_password or ""
config_str = ",".join(["{},{}".format(*kv) for kv in config.items()])
if gcs_server_port is None:
gcs_server_port = 0
command = [
GCS_SERVER_EXECUTABLE,
"--redis_address={}".format(gcs_ip_address),
"--redis_port={}".format(gcs_port),
"--config_list={}".format(config_str),
"--gcs_server_port={}".format(gcs_server_port),
]
if redis_password:
command += ["--redis_password={}".format(redis_password)]
+4 -3
View File
@@ -14,14 +14,14 @@
#include <iostream>
#include "gflags/gflags.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_server/gcs_server.h"
#include "ray/util/util.h"
#include "gflags/gflags.h"
DEFINE_string(redis_address, "", "The ip address of redis.");
DEFINE_int32(redis_port, -1, "The port of redis.");
DEFINE_int32(gcs_server_port, -1, "The port of gcs server.");
DEFINE_string(config_list, "", "The config list of raylet.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_bool(retry_redis, false, "Whether we retry to connect to the redis.");
@@ -35,6 +35,7 @@ int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const int gcs_server_port = static_cast<int>(FLAGS_gcs_server_port);
const std::string config_list = FLAGS_config_list;
const std::string redis_password = FLAGS_redis_password;
const bool retry_redis = FLAGS_retry_redis;
@@ -58,7 +59,7 @@ int main(int argc, char *argv[]) {
ray::gcs::GcsServerConfig gcs_server_config;
gcs_server_config.grpc_server_name = "GcsServer";
gcs_server_config.grpc_server_port = 0;
gcs_server_config.grpc_server_port = gcs_server_port;
gcs_server_config.grpc_server_thread_num = 1;
gcs_server_config.redis_address = redis_address;
gcs_server_config.redis_port = redis_port;