mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:22:39 +08:00
[Core][CLI] ray status and ray memory no longer starts a new job (#13391)
* Access memory info in ray memory via GlobalStateAccessor rather than calling ray.init() * Modify ray status cli so that it doesn't start a new job via ray.init() * Remove local test file * Access memory info in ray memory via GlobalStateAccessor rather than calling ray.init() * Modify ray status cli so that it doesn't start a new job via ray.init() * Remove local test file * Make status and error args required in commands.py#debug.status * Remove unnecessary imports * Access memory info in ray memory via GlobalStateAccessor rather than calling ray.init() * Modify ray status cli so that it doesn't start a new job via ray.init() * Remove local test file * Access memory info in ray memory via GlobalStateAccessor rather than calling ray.init() * Modify ray status cli so that it doesn't start a new job via ray.init() * Remove local test file * Make status and error args required in commands.py#debug.status * Remove unnecessary imports * Job 38482.1 should now pass * Resolve merge conflict
This commit is contained in:
@@ -20,14 +20,13 @@ except ImportError: # py2
|
||||
from pipes import quote
|
||||
|
||||
import ray
|
||||
from ray.experimental.internal_kv import _internal_kv_get, _internal_kv_put
|
||||
from ray.experimental.internal_kv import _internal_kv_put
|
||||
import ray._private.services as services
|
||||
from ray.autoscaler.node_provider import NodeProvider
|
||||
from ray.autoscaler._private.constants import \
|
||||
AUTOSCALER_RESOURCE_REQUEST_CHANNEL
|
||||
from ray.autoscaler._private.util import validate_config, hash_runtime_conf, \
|
||||
hash_launch_conf, prepare_config, DEBUG_AUTOSCALING_ERROR, \
|
||||
DEBUG_AUTOSCALING_STATUS
|
||||
hash_launch_conf, prepare_config
|
||||
from ray.autoscaler._private.providers import _get_node_provider, \
|
||||
_NODE_PROVIDERS, _PROVIDER_PRETTY_NAMES
|
||||
from ray.autoscaler.tags import TAG_RAY_NODE_KIND, TAG_RAY_LAUNCH_CONFIG, \
|
||||
@@ -90,10 +89,8 @@ def try_reload_log_state(provider_config: Dict[str, Any],
|
||||
return reload_log_state(log_state)
|
||||
|
||||
|
||||
def debug_status() -> str:
|
||||
def debug_status(status, error) -> str:
|
||||
"""Return a debug string for the autoscaler."""
|
||||
status = _internal_kv_get(DEBUG_AUTOSCALING_STATUS)
|
||||
error = _internal_kv_get(DEBUG_AUTOSCALING_ERROR)
|
||||
if not status:
|
||||
status = "No cluster status."
|
||||
else:
|
||||
|
||||
@@ -13,7 +13,7 @@ def global_gc():
|
||||
worker.core_worker.global_gc()
|
||||
|
||||
|
||||
def memory_summary():
|
||||
def memory_summary(node_manager_address=None, node_manager_port=None):
|
||||
"""Returns a formatted string describing memory usage in the cluster."""
|
||||
|
||||
import grpc
|
||||
@@ -22,9 +22,13 @@ def memory_summary():
|
||||
|
||||
# We can ask any Raylet for the global memory info, that Raylet internally
|
||||
# asks all nodes in the cluster for memory stats.
|
||||
raylet = ray.nodes()[0]
|
||||
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
|
||||
ray.nodes()[0]["NodeManagerPort"])
|
||||
if (node_manager_address is None or node_manager_port is None):
|
||||
raylet = ray.nodes()[0]
|
||||
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
|
||||
raylet["NodeManagerPort"])
|
||||
else:
|
||||
raylet_address = "{}:{}".format(node_manager_address,
|
||||
node_manager_port)
|
||||
channel = grpc.insecure_channel(
|
||||
raylet_address,
|
||||
options=[
|
||||
|
||||
@@ -19,6 +19,9 @@ from ray.autoscaler._private.commands import (
|
||||
attach_cluster, exec_cluster, create_or_update_cluster, monitor_cluster,
|
||||
rsync, teardown_cluster, get_head_node_ip, kill_node, get_worker_node_ips,
|
||||
debug_status, RUN_ENV_TYPES)
|
||||
from ray.autoscaler._private.util import DEBUG_AUTOSCALING_ERROR, \
|
||||
DEBUG_AUTOSCALING_STATUS
|
||||
from ray.state import GlobalState
|
||||
import ray.ray_constants as ray_constants
|
||||
import ray.utils
|
||||
|
||||
@@ -1363,9 +1366,12 @@ def memory(address, redis_password):
|
||||
"""Print object references held in a Ray cluster."""
|
||||
if not address:
|
||||
address = services.get_ray_address_to_use_or_die()
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address, _redis_password=redis_password)
|
||||
print(ray.internal.internal_api.memory_summary())
|
||||
state = GlobalState()
|
||||
state._initialize_global_state(address, redis_password)
|
||||
raylet = state.node_table()[0]
|
||||
print(
|
||||
ray.internal.internal_api.memory_summary(raylet["NodeManagerAddress"],
|
||||
raylet["NodeManagerPort"]))
|
||||
|
||||
|
||||
@cli.command()
|
||||
@@ -1374,13 +1380,21 @@ def memory(address, redis_password):
|
||||
required=False,
|
||||
type=str,
|
||||
help="Override the address to connect to.")
|
||||
def status(address):
|
||||
@click.option(
|
||||
"--redis_password",
|
||||
required=False,
|
||||
type=str,
|
||||
default=ray_constants.REDIS_DEFAULT_PASSWORD,
|
||||
help="Connect to ray with redis_password.")
|
||||
def status(address, redis_password):
|
||||
"""Print cluster status, including autoscaling info."""
|
||||
if not address:
|
||||
address = services.get_ray_address_to_use_or_die()
|
||||
logger.info(f"Connecting to Ray instance at {address}.")
|
||||
ray.init(address=address)
|
||||
print(debug_status())
|
||||
redis_client = ray._private.services.create_redis_client(
|
||||
address, redis_password)
|
||||
status = redis_client.hget(DEBUG_AUTOSCALING_STATUS, "value")
|
||||
error = redis_client.hget(DEBUG_AUTOSCALING_ERROR, "value")
|
||||
print(debug_status(status, error))
|
||||
|
||||
|
||||
@cli.command(hidden=True)
|
||||
|
||||
Reference in New Issue
Block a user