Add RayParams to refactor the parameters used by ray python. (#3558)

This commit is contained in:
Yuhong Guo
2018-12-29 22:04:27 +08:00
committed by Hao Chen
parent eb1e5fa2cf
commit c9b8ecca51
12 changed files with 543 additions and 697 deletions
+32 -44
View File
@@ -15,6 +15,8 @@ from ray.autoscaler.commands import (attach_cluster, exec_cluster,
import ray.ray_constants as ray_constants
import ray.utils
from ray.parameter import RayParams
logger = logging.getLogger(__name__)
@@ -243,6 +245,22 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
resources["CPU"] = num_cpus
if num_gpus is not None:
resources["GPU"] = num_gpus
ray_params = RayParams(
node_ip_address=node_ip_address,
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
num_workers=num_workers,
object_store_memory=object_store_memory,
redis_password=redis_password,
redirect_worker_output=not no_redirect_worker_output,
redirect_output=not no_redirect_output,
resources=resources,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
_internal_config=internal_config)
if head:
# Start Ray on the head node.
@@ -266,36 +284,21 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
"provided.")
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address()
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address())
logger.info("Using IP address {} for this node."
.format(node_ip_address))
address_info = services.start_ray_head(
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
node_ip_address=node_ip_address,
.format(ray_params.node_ip_address))
ray_params.update_if_absent(
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
num_workers=num_workers,
cleanup=False,
redirect_worker_output=not no_redirect_worker_output,
redirect_output=not no_redirect_output,
resources=resources,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
redis_password=redis_password,
include_webui=(not no_ui),
plasma_directory=plasma_directory,
huge_pages=huge_pages,
autoscaling_config=autoscaling_config,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
_internal_config=internal_config)
autoscaling_config=autoscaling_config)
address_info = services.start_ray_head(ray_params, cleanup=False)
logger.info(address_info)
logger.info(
"\nStarted Ray on this node. You can add additional nodes to "
@@ -350,32 +353,17 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
services.check_version_info(redis_client)
# Get the node IP address if one is not provided.
if node_ip_address is None:
node_ip_address = services.get_node_ip_address(redis_address)
ray_params.update_if_absent(
node_ip_address=services.get_node_ip_address(redis_address))
logger.info("Using IP address {} for this node."
.format(node_ip_address))
.format(ray_params.node_ip_address))
# Check that there aren't already Redis clients with the same IP
# address connected with this Redis instance. This raises an exception
# if the Redis server already has clients on this node.
check_no_existing_redis_clients(node_ip_address, redis_client)
address_info = services.start_ray_node(
node_ip_address=node_ip_address,
redis_address=redis_address,
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
num_workers=num_workers,
object_store_memory=object_store_memory,
redis_password=redis_password,
cleanup=False,
redirect_worker_output=not no_redirect_worker_output,
redirect_output=not no_redirect_output,
resources=resources,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
plasma_store_socket_name=plasma_store_socket_name,
raylet_socket_name=raylet_socket_name,
temp_dir=temp_dir,
_internal_config=internal_config)
check_no_existing_redis_clients(ray_params.node_ip_address,
redis_client)
ray_params.redis_address = redis_address
address_info = services.start_ray_node(ray_params, cleanup=False)
logger.info(address_info)
logger.info("\nStarted Ray on this node. If you wish to terminate the "
"processes that have been started, run\n\n"