diff --git a/python/ray/autoscaler/_private/commands.py b/python/ray/autoscaler/_private/commands.py index 0dd68848e..5647b717a 100644 --- a/python/ray/autoscaler/_private/commands.py +++ b/python/ray/autoscaler/_private/commands.py @@ -20,14 +20,13 @@ except ImportError: # py2 from pipes import quote import ray -from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put +from ray.experimental.internal_kv import _internal_kv_put import ray._private.services as services from ray.autoscaler.node_provider import NodeProvider from ray.autoscaler._private.constants import \ AUTOSCALER_RESOURCE_REQUEST_CHANNEL from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \ - hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \ - DEBUG_AUTOSCALING_STATUS + hash_launch_conf, prepare_config from ray.autoscaler._private.providers import _get_node_provider, \ _NODE_PROVIDERS, _PROVIDER_PRETTY_NAMES from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \ @@ -90,10 +89,8 @@ def try_reload_log_state(provider_config: Dict[str, Any], return reload_log_state(log_state) -def debug_status() -> str: +def debug_status(status, error) -> str: """Return a debug string for the autoscaler.""" - status = _internal_kv_get(DEBUG_AUTOSCALING_STATUS) - error = _internal_kv_get(DEBUG_AUTOSCALING_ERROR) if not status: status = "No cluster status." else: diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 999e49220..67c1a9275 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -13,7 +13,7 @@ def global_gc(): worker.core_worker.global_gc() -def memory_summary(): +def memory_summary(node_manager_address=None, node_manager_port=None): """Returns a formatted string describing memory usage in the cluster.""" import grpc @@ -22,9 +22,13 @@ def memory_summary(): # We can ask any Raylet for the global memory info, that Raylet internally # asks all nodes in the cluster for memory stats. - raylet = ray.nodes()[0] - raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], - ray.nodes()[0]["NodeManagerPort"]) + if (node_manager_address is None or node_manager_port is None): + raylet = ray.nodes()[0] + raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], + raylet["NodeManagerPort"]) + else: + raylet_address = "{}:{}".format(node_manager_address, + node_manager_port) channel = grpc.insecure_channel( raylet_address, options=[ diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 11941f87a..d73e4335d 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -19,6 +19,9 @@ from ray.autoscaler._private.commands import ( attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster, rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips, debug_status, RUN_ENV_TYPES) +from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \ + DEBUG_AUTOSCALING_STATUS +from ray.state import GlobalState import ray.ray_constants as ray_constants import ray.utils @@ -1363,9 +1366,12 @@ def memory(address, redis_password): """Print object references held in a Ray cluster.""" if not address: address = services.get_ray_address_to_use_or_die() - logger.info(f"Connecting to Ray instance at {address}.") - ray.init(address=address, _redis_password=redis_password) - print(ray.internal.internal_api.memory_summary()) + state = GlobalState() + state._initialize_global_state(address, redis_password) + raylet = state.node_table()[0] + print( + ray.internal.internal_api.memory_summary(raylet["NodeManagerAddress"], + raylet["NodeManagerPort"])) @cli.command() @@ -1374,13 +1380,21 @@ def memory(address, redis_password): required=False, type=str, help="Override the address to connect to.") -def status(address): +@click.option( + "--redis_password", + required=False, + type=str, + default=ray_constants.REDIS_DEFAULT_PASSWORD, + help="Connect to ray with redis_password.") +def status(address, redis_password): """Print cluster status, including autoscaling info.""" if not address: address = services.get_ray_address_to_use_or_die() - logger.info(f"Connecting to Ray instance at {address}.") - ray.init(address=address) - print(debug_status()) + redis_client = ray._private.services.create_redis_client( + address, redis_password) + status = redis_client.hget(DEBUG_AUTOSCALING_STATUS, "value") + error = redis_client.hget(DEBUG_AUTOSCALING_ERROR, "value") + print(debug_status(status, error)) @cli.command(hidden=True)